Compare commits

..

No commits in common. "v3" and "v3.8.0" have entirely different histories.
v3 ... v3.8.0

38 changed files with 347 additions and 3563 deletions

View File

@ -1,20 +0,0 @@
name: "autoapprove"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
autoapprove:
runs-on: ubuntu-latest
steps:
- name: approve
uses: hmarr/auto-approve-action@v3
if: github.actor == 'vtolstov' || github.actor == 'dependabot[bot]'
id: approve
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -1,21 +0,0 @@
name: "automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'vtolstov'
steps:
- name: merge
id: merge
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

View File

@ -3,20 +3,19 @@ on:
push: push:
branches: branches:
- master - master
- v3
jobs: jobs:
test: test:
name: test name: test
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: setup - name: setup
uses: actions/setup-go@v3 uses: actions/setup-go@v2
with: with:
go-version: 1.17 go-version: 1.16
- name: checkout - name: checkout
uses: actions/checkout@v3 uses: actions/checkout@v2
- name: cache - name: cache
uses: actions/cache@v3 uses: actions/cache@v2
with: with:
path: ~/go/pkg/mod path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
@ -32,9 +31,9 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: checkout - name: checkout
uses: actions/checkout@v3 uses: actions/checkout@v2
- name: lint - name: lint
uses: golangci/golangci-lint-action@v3.4.0 uses: golangci/golangci-lint-action@v2
continue-on-error: true continue-on-error: true
with: with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.

View File

@ -9,7 +9,7 @@
# the `language` matrix defined below to confirm you have the correct set of # the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages. # supported CodeQL languages.
# #
name: "codeql" name: "CodeQL"
on: on:
workflow_run: workflow_run:
@ -17,16 +17,16 @@ on:
types: types:
- completed - completed
push: push:
branches: [ master, v3 ] branches: [ master ]
pull_request: pull_request:
# The branches below must be a subset of the branches above # The branches below must be a subset of the branches above
branches: [ master, v3 ] branches: [ master ]
schedule: schedule:
- cron: '34 1 * * 0' - cron: '34 1 * * 0'
jobs: jobs:
analyze: analyze:
name: analyze name: Analyze
runs-on: ubuntu-latest runs-on: ubuntu-latest
permissions: permissions:
actions: read actions: read
@ -42,15 +42,12 @@ jobs:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed # https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps: steps:
- name: checkout - name: Checkout repository
uses: actions/checkout@v3 uses: actions/checkout@v2
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
# Initializes the CodeQL tools for scanning. # Initializes the CodeQL tools for scanning.
- name: init - name: Initialize CodeQL
uses: github/codeql-action/init@v2 uses: github/codeql-action/init@v1
with: with:
languages: ${{ matrix.language }} languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file. # If you wish to specify custom queries, you can do so here or in a config file.
@ -60,8 +57,8 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java). # Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below) # If this step fails, then you should remove it and run the build manually (see below)
- name: autobuild - name: Autobuild
uses: github/codeql-action/autobuild@v2 uses: github/codeql-action/autobuild@v1
# Command-line programs to run using the OS shell. # Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl # 📚 https://git.io/JvXDl
@ -74,5 +71,5 @@ jobs:
# make bootstrap # make bootstrap
# make release # make release
- name: analyze - name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2 uses: github/codeql-action/analyze@v1

View File

@ -1,27 +1,66 @@
name: "dependabot-automerge" name: "prautomerge"
on: on:
pull_request_target: workflow_run:
types: [assigned, opened, synchronize, reopened] workflows: ["prbuild"]
types:
- completed
permissions: permissions:
pull-requests: write
contents: write contents: write
pull-requests: write
jobs: jobs:
automerge: Dependabot-Automerge:
runs-on: ubuntu-latest runs-on: ubuntu-latest
if: github.actor == 'dependabot[bot]' # Contains workaround to execute if dependabot updates the PR by checking for the base branch in the linked PR
# The the github.event.workflow_run.event value is 'push' and not 'pull_request'
# dont work with multiple workflows when last returns success
if: >-
github.event.workflow_run.conclusion == 'success'
&& github.actor == 'dependabot[bot]'
&& github.event.sender.login == 'dependabot[bot]'
&& github.event.sender.type == 'Bot'
&& (github.event.workflow_run.event == 'pull_request'
|| (github.event.workflow_run.event == 'push' && github.event.workflow_run.pull_requests[0].base.ref == github.event.repository.default_branch ))
steps: steps:
- name: metadata - name: Approve Changes and Merge changes if label 'dependencies' is set
id: metadata uses: actions/github-script@v5
uses: dependabot/fetch-metadata@v1.3.6
with: with:
github-token: "${{ secrets.TOKEN }}" github-token: ${{ secrets.GITHUB_TOKEN }}
- name: merge script: |
id: merge console.log(context.payload.workflow_run);
if: ${{contains(steps.metadata.outputs.dependency-names, 'go.unistack.org')}}
run: gh pr merge --auto --merge "$PR_URL" var labelNames = await github.paginate(
env: github.issues.listLabelsOnIssue,
PR_URL: ${{github.event.pull_request.html_url}} {
GITHUB_TOKEN: ${{secrets.TOKEN}} repo: context.repo.repo,
owner: context.repo.owner,
issue_number: context.payload.workflow_run.pull_requests[0].number,
},
(response) => response.data.map(
(label) => label.name
)
);
console.log(labelNames);
if (labelNames.includes('dependencies')) {
console.log('Found label');
await github.pulls.createReview({
repo: context.repo.repo,
owner: context.repo.owner,
pull_number: context.payload.workflow_run.pull_requests[0].number,
event: 'APPROVE'
});
console.log('Approved PR');
await github.pulls.merge({
repo: context.repo.repo,
owner: context.repo.owner,
pull_number: context.payload.workflow_run.pull_requests[0].number,
});
console.log('Merged PR');
}

View File

@ -3,20 +3,19 @@ on:
pull_request: pull_request:
branches: branches:
- master - master
- v3
jobs: jobs:
test: test:
name: test name: test
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: setup - name: setup
uses: actions/setup-go@v3 uses: actions/setup-go@v2
with: with:
go-version: 1.17 go-version: 1.16
- name: checkout - name: checkout
uses: actions/checkout@v3 uses: actions/checkout@v2
- name: cache - name: cache
uses: actions/cache@v3 uses: actions/cache@v2
with: with:
path: ~/go/pkg/mod path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
@ -32,9 +31,9 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: checkout - name: checkout
uses: actions/checkout@v3 uses: actions/checkout@v2
- name: lint - name: lint
uses: golangci/golangci-lint-action@v3.4.0 uses: golangci/golangci-lint-action@v2
continue-on-error: true continue-on-error: true
with: with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.

24
.gitignore vendored
View File

@ -1,24 +0,0 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
bin
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
# General
.DS_Store
.idea
.vscode

22
go.mod
View File

@ -1,24 +1,8 @@
module go.unistack.org/micro-server-http/v3 module go.unistack.org/micro-server-http/v3
go 1.21 go 1.16
toolchain go1.23.1
require ( require (
go.unistack.org/micro-client-http/v3 v3.9.12 go.unistack.org/micro/v3 v3.8.5
go.unistack.org/micro-codec-yaml/v3 v3.10.1 golang.org/x/net v0.0.0-20211020060615-d418f374d309
go.unistack.org/micro-proto/v3 v3.4.1
go.unistack.org/micro/v3 v3.10.88
golang.org/x/net v0.29.0
)
require (
github.com/google/gnostic v0.7.0 // indirect
github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect
golang.org/x/sys v0.25.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.66.2 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
) )

1607
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -6,21 +6,14 @@ import (
"io" "io"
"net/http" "net/http"
"reflect" "reflect"
"slices"
"strconv"
"strings" "strings"
"sync" "sync"
"time"
"go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v3/tracer"
rhttp "go.unistack.org/micro/v3/util/http" rhttp "go.unistack.org/micro/v3/util/http"
rflutil "go.unistack.org/micro/v3/util/reflect" rflutil "go.unistack.org/micro/v3/util/reflect"
) )
@ -38,13 +31,14 @@ var (
type patHandler struct { type patHandler struct {
mtype *methodType mtype *methodType
rcvr reflect.Value rcvr reflect.Value
pat *rhttp.Trie
name string name string
} }
type httpHandler struct { type httpHandler struct {
opts server.HandlerOptions opts server.HandlerOptions
hd interface{} hd interface{}
handlers *rhttp.Trie handlers map[string][]patHandler
name string name string
eps []*register.Endpoint eps []*register.Endpoint
sopts server.Options sopts server.Options
@ -67,477 +61,82 @@ func (h *httpHandler) Options() server.HandlerOptions {
return h.opts return h.opts
} }
func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error) { func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if handler == nil { if ph, _, ok := h.pathHandlers.Search(r.Method, r.URL.Path); ok {
return nil, fmt.Errorf("invalid handler specified: %v", handler) ph.(http.HandlerFunc)(w, r)
return
} }
rtype := reflect.TypeOf(handler)
if rtype.NumIn() != 3 {
return nil, fmt.Errorf("invalid handler, NumIn != 3: %v", rtype.NumIn())
}
argType := rtype.In(1)
replyType := rtype.In(2)
// First arg need not be a pointer.
if !isExportedOrBuiltinType(argType) {
return nil, fmt.Errorf("invalid handler, argument type not exported: %v", argType)
}
if replyType.Kind() != reflect.Ptr {
return nil, fmt.Errorf("invalid handler, reply type not a pointer: %v", replyType)
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
return nil, fmt.Errorf("invalid handler, reply type not exported: %v", replyType)
}
if rtype.NumOut() != 1 {
return nil, fmt.Errorf("invalid handler, has wrong number of outs: %v", rtype.NumOut())
}
// The return type of the method must be error.
if returnType := rtype.Out(0); returnType != typeOfError {
return nil, fmt.Errorf("invalid handler, returns %v not error", returnType.String())
}
return func(w http.ResponseWriter, r *http.Request) {
ct := DefaultContentType
if htype := r.Header.Get(metadata.HeaderContentType); htype != "" {
ct = htype
}
ctx := context.WithValue(r.Context(), rspCodeKey{}, &rspCodeVal{})
ctx = context.WithValue(ctx, rspHeaderKey{}, &rspHeaderVal{})
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(len(r.Header) + 8)
}
for k, v := range r.Header {
md[k] = strings.Join(v, ", ")
}
md["RemoteAddr"] = r.RemoteAddr
md["Method"] = r.Method
md["URL"] = r.URL.String()
md["Proto"] = r.Proto
md["Content-Length"] = fmt.Sprintf("%d", r.ContentLength)
md["Transfer-Encoding"] = strings.Join(r.TransferEncoding, ",")
md["Host"] = r.Host
md["RequestURI"] = r.RequestURI
if r.TLS != nil {
md["TLS"] = "true"
md["TLS-ALPN"] = r.TLS.NegotiatedProtocol
md["TLS-ServerName"] = r.TLS.ServerName
}
ctx = metadata.NewIncomingContext(ctx, md)
path := r.URL.Path
if r.Body != nil {
defer r.Body.Close()
}
matches := make(map[string]interface{})
var match bool
var hldr *patHandler
var handler *httpHandler
for _, shdlr := range h.handlers {
hdlr := shdlr.(*httpHandler)
fh, mp, err := hdlr.handlers.Search(r.Method, path)
if err == nil {
match = true
for k, v := range mp {
matches[k] = v
}
hldr = fh.(*patHandler)
handler = hdlr
break
} else if err == rhttp.ErrMethodNotAllowed && !h.registerRPC {
w.WriteHeader(http.StatusMethodNotAllowed)
_, _ = w.Write([]byte("not matching route found"))
return
}
}
if !match && h.registerRPC {
microMethod, mok := md.Get(metadata.HeaderEndpoint)
if mok {
serviceMethod := strings.Split(microMethod, ".")
if len(serviceMethod) == 2 {
if shdlr, ok := h.handlers[serviceMethod[0]]; ok {
hdlr := shdlr.(*httpHandler)
fh, mp, err := hdlr.handlers.Search(http.MethodPost, "/"+microMethod)
if err == nil {
match = true
for k, v := range mp {
matches[k] = v
}
hldr = fh.(*patHandler)
handler = hdlr
}
}
}
}
}
// get fields from url values
if len(r.URL.RawQuery) > 0 {
umd, cerr := rflutil.URLMap(r.URL.RawQuery)
if cerr != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(cerr.Error()))
return
}
for k, v := range umd {
matches[k] = v
}
}
cf, err := h.newCodec(ct)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
var argv, replyv reflect.Value
// Decode the argument value.
argIsValue := false // if true, need to indirect before calling.
if hldr.mtype.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(hldr.mtype.ArgType.Elem())
} else {
argv = reflect.New(hldr.mtype.ArgType)
argIsValue = true
}
if argIsValue {
argv = argv.Elem()
}
// reply value
replyv = reflect.New(hldr.mtype.ReplyType.Elem())
function := hldr.mtype.method.Func
var returnValues []reflect.Value
if r.Body != nil {
var buf []byte
buf, err = io.ReadAll(r.Body)
if err != nil && err != io.EOF {
h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
return
}
if err = cf.Unmarshal(buf, argv.Interface()); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
return
}
}
matches = rflutil.FlattenMap(matches)
if err = rflutil.Merge(argv.Interface(), matches, rflutil.SliceAppend(true), rflutil.Tags([]string{"protobuf", "json"})); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
return
}
hr := &rpcRequest{
codec: cf,
service: handler.sopts.Name,
contentType: ct,
method: fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name),
endpoint: fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name),
payload: argv.Interface(),
header: md,
}
// define the handler func
fn := func(fctx context.Context, req server.Request, rsp interface{}) (err error) {
returnValues = function.Call([]reflect.Value{hldr.rcvr, hldr.mtype.prepareContext(fctx), argv, reflect.ValueOf(rsp)})
// The return value for the method is an error.
if rerr := returnValues[0].Interface(); rerr != nil {
err = rerr.(error)
}
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(0)
}
if nmd, ok := metadata.FromOutgoingContext(fctx); ok {
for k, v := range nmd {
md.Set(k, v)
}
}
metadata.SetOutgoingContext(ctx, md)
return err
}
// wrap the handler func
h.opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(server.HookHandler); ok {
fn = h(fn)
}
})
if ct == "application/x-www-form-urlencoded" {
cf, err = h.newCodec(DefaultContentType)
if err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
return
}
ct = DefaultContentType
}
scode := int(200)
appErr := fn(ctx, hr, replyv.Interface())
w.Header().Set(metadata.HeaderContentType, ct)
if md, ok := metadata.FromOutgoingContext(ctx); ok {
for k, v := range md {
w.Header().Set(k, v)
}
}
if md := getRspHeader(ctx); md != nil {
for k, v := range md {
for _, vv := range v {
w.Header().Add(k, vv)
}
}
}
if nct := w.Header().Get(metadata.HeaderContentType); nct != ct {
if cf, err = h.newCodec(nct); err != nil {
h.errorHandler(ctx, nil, w, r, err, http.StatusBadRequest)
return
}
}
var buf []byte
if appErr != nil {
switch verr := appErr.(type) {
case *errors.Error:
scode = int(verr.Code)
buf, err = cf.Marshal(verr)
case *Error:
buf, err = cf.Marshal(verr.err)
default:
buf, err = cf.Marshal(appErr)
}
} else {
buf, err = cf.Marshal(replyv.Interface())
}
if err != nil && handler.sopts.Logger.V(logger.ErrorLevel) {
handler.sopts.Logger.Errorf(handler.sopts.Context, "handler err: %v", err)
return
}
if nscode := GetRspCode(ctx); nscode != 0 {
scode = nscode
}
w.WriteHeader(scode)
if _, cerr := w.Write(buf); cerr != nil {
handler.sopts.Logger.Errorf(ctx, "write failed: %v", cerr)
}
}, nil
}
func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ct := DefaultContentType ct := DefaultContentType
if htype := r.Header.Get(metadata.HeaderContentType); htype != "" { if htype := r.Header.Get(metadata.HeaderContentType); htype != "" {
ct = htype ct = htype
} }
ts := time.Now() if idx := strings.Index(ct, ":"); idx > 0 {
if ph, ok := h.contentTypeHandlers[ct[:idx]]; ok {
ph(w, r)
return
}
}
ctx := context.WithValue(r.Context(), rspCodeKey{}, &rspCodeVal{}) ctx := context.WithValue(r.Context(), rspCodeKey{}, &rspCodeVal{})
ctx = context.WithValue(ctx, rspHeaderKey{}, &rspHeaderVal{})
md, ok := metadata.FromIncomingContext(ctx) md, ok := metadata.FromIncomingContext(ctx)
if !ok { if !ok {
md = metadata.New(len(r.Header) + 8) md = metadata.New(len(r.Header))
} }
for k, v := range r.Header { for k, v := range r.Header {
md[k] = strings.Join(v, ", ") md.Set(k, strings.Join(v, ", "))
} }
md["RemoteAddr"] = r.RemoteAddr md.Set("RemoteAddr", r.RemoteAddr)
if r.TLS != nil { md.Set("Method", r.Method)
md["Scheme"] = "https" md.Set("URL", r.URL.String())
} else { md.Set("Proto", r.Proto)
md["Scheme"] = "http" md.Set("ContentLength", fmt.Sprintf("%d", r.ContentLength))
} md.Set("TransferEncoding", strings.Join(r.TransferEncoding, ","))
md["Method"] = r.Method md.Set("Host", r.Host)
md["URL"] = r.URL.String() md.Set("RequestURI", r.RequestURI)
md["Proto"] = r.Proto
md["ContentLength"] = fmt.Sprintf("%d", r.ContentLength)
if len(r.TransferEncoding) > 0 {
md["TransferEncoding"] = strings.Join(r.TransferEncoding, ",")
}
md["Host"] = r.Host
md["RequestURI"] = r.RequestURI
ctx = metadata.NewIncomingContext(ctx, md) ctx = metadata.NewIncomingContext(ctx, md)
defer r.Body.Close()
path := r.URL.Path path := r.URL.Path
if !strings.HasPrefix(path, "/") { if !strings.HasPrefix(path, "/") {
h.errorHandler(ctx, nil, w, r, fmt.Errorf("path must starts with /"), http.StatusBadRequest) h.errorHandler(ctx, nil, w, r, fmt.Errorf("path must starts with /"), http.StatusBadRequest)
return return
} }
matches := make(map[string]interface{}) cf, err := h.newCodec(ct)
if err != nil {
var match bool h.errorHandler(ctx, nil, w, r, err, http.StatusBadRequest)
var hldr *patHandler
var handler *httpHandler
for _, shdlr := range h.handlers {
hdlr := shdlr.(*httpHandler)
fh, mp, err := hdlr.handlers.Search(r.Method, path)
if err == nil {
match = true
for k, v := range mp {
matches[k] = v
}
hldr = fh.(*patHandler)
handler = hdlr
break
} else if err == rhttp.ErrMethodNotAllowed && !h.registerRPC {
h.errorHandler(ctx, nil, w, r, fmt.Errorf("not matching route found"), http.StatusMethodNotAllowed)
return
}
}
if !match && h.registerRPC {
microMethod, mok := md.Get(metadata.HeaderEndpoint)
if mok {
serviceMethod := strings.Split(microMethod, ".")
if len(serviceMethod) == 2 {
if shdlr, ok := h.handlers[serviceMethod[0]]; ok {
hdlr := shdlr.(*httpHandler)
fh, mp, err := hdlr.handlers.Search(http.MethodPost, "/"+microMethod)
if err == nil {
match = true
for k, v := range mp {
matches[k] = v
}
hldr = fh.(*patHandler)
handler = hdlr
}
}
}
}
}
var sp tracer.Span
if !match && h.hd != nil {
if hdlr, ok := h.hd.Handler().(http.Handler); ok {
if !slices.Contains(tracer.DefaultSkipEndpoints, h.hd.Name()) {
ctx, sp = h.opts.Tracer.Start(ctx, h.hd.Name()+" rpc-server",
tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels(
"endpoint", h.hd.Name(),
),
)
defer func() {
n := GetRspCode(ctx)
if s, _ := sp.Status(); s != tracer.SpanStatusError && n > 399 {
sp.SetStatus(tracer.SpanStatusError, http.StatusText(n))
}
sp.Finish()
}()
}
if !slices.Contains(meter.DefaultSkipEndpoints, h.hd.Name()) {
h.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", h.hd.Name()).Inc()
defer func() {
n := GetRspCode(ctx)
if n > 399 {
h.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", h.hd.Name(), "status", "success", "code", strconv.Itoa(n)).Inc()
} else {
h.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", h.hd.Name(), "status", "failure", "code", strconv.Itoa(n)).Inc()
}
te := time.Since(ts)
h.opts.Meter.Summary(semconv.ServerRequestLatencyMicroseconds, "endpoint", h.hd.Name()).Update(te.Seconds())
h.opts.Meter.Histogram(semconv.ServerRequestDurationSeconds, "endpoint", h.hd.Name()).Update(te.Seconds())
h.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", h.hd.Name()).Dec()
}()
}
hdlr.ServeHTTP(w, r.WithContext(ctx))
return
}
} else if !match {
// check for http.HandlerFunc handlers
if !slices.Contains(tracer.DefaultSkipEndpoints, r.URL.Path) {
ctx, sp = h.opts.Tracer.Start(ctx, r.URL.Path+" rpc-server",
tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels(
"endpoint", r.URL.Path,
),
)
defer func() {
if n := GetRspCode(ctx); n > 399 {
sp.SetStatus(tracer.SpanStatusError, http.StatusText(n))
} else {
sp.SetStatus(tracer.SpanStatusError, http.StatusText(http.StatusNotFound))
}
sp.Finish()
}()
}
if ph, _, err := h.pathHandlers.Search(r.Method, r.URL.Path); err == nil {
ph.(http.HandlerFunc)(w, r.WithContext(ctx))
return
}
h.errorHandler(ctx, nil, w, r, fmt.Errorf("not matching route found"), http.StatusNotFound)
return return
} }
endpointName := fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name) matches := make(map[string]interface{})
topts := []tracer.SpanOption{
tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels(
"endpoint", endpointName,
),
}
if slices.Contains(tracer.DefaultSkipEndpoints, endpointName) { var match bool
topts = append(topts, tracer.WithSpanRecord(false)) var hldr patHandler
} var handler *httpHandler
ctx, sp = h.opts.Tracer.Start(ctx, endpointName+" rpc-server", topts...) for _, hpat := range h.handlers {
handlertmp := hpat.(*httpHandler)
if !slices.Contains(meter.DefaultSkipEndpoints, handler.name) { for _, hldrtmp := range handlertmp.handlers[r.Method] {
defer func() { _, mp, ok := hldrtmp.pat.Search(r.Method, path)
te := time.Since(ts) if ok {
h.opts.Meter.Summary(semconv.ServerRequestLatencyMicroseconds, "endpoint", handler.name).Update(te.Seconds()) match = true
h.opts.Meter.Histogram(semconv.ServerRequestDurationSeconds, "endpoint", handler.name).Update(te.Seconds()) for k, v := range mp {
h.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", handler.name).Dec() matches[k] = v
}
n := GetRspCode(ctx) hldr = hldrtmp
if n > 399 { handler = handlertmp
h.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", handler.name, "status", "failure", "code", strconv.Itoa(n)).Inc() break
} else {
h.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", handler.name, "status", "success", "code", strconv.Itoa(n)).Inc()
}
}()
}
defer func() {
n := GetRspCode(ctx)
if n > 399 {
if s, _ := sp.Status(); s != tracer.SpanStatusError {
sp.SetStatus(tracer.SpanStatusError, http.StatusText(n))
} }
} }
sp.Finish() }
}()
if !match {
h.errorHandler(ctx, nil, w, r, fmt.Errorf("not matching route found"), http.StatusNotFound)
return
}
// get fields from url values // get fields from url values
if len(r.URL.RawQuery) > 0 { if len(r.URL.RawQuery) > 0 {
@ -551,16 +150,6 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
} }
if r.Body != nil {
defer r.Body.Close()
}
cf, err := h.newCodec(ct)
if err != nil {
h.errorHandler(ctx, nil, w, r, err, http.StatusBadRequest)
return
}
var argv, replyv reflect.Value var argv, replyv reflect.Value
// Decode the argument value. // Decode the argument value.
@ -582,26 +171,21 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
function := hldr.mtype.method.Func function := hldr.mtype.method.Func
var returnValues []reflect.Value var returnValues []reflect.Value
if r.Body != nil { if err = cf.ReadBody(r.Body, argv.Interface()); err != nil && err != io.EOF {
var buf []byte h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
buf, err = io.ReadAll(r.Body) return
if err != nil && err != io.EOF {
h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
return
}
if err = cf.Unmarshal(buf, argv.Interface()); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
return
}
} }
if len(matches) > 0 { matches = rflutil.FlattenMap(matches)
matches = rflutil.FlattenMap(matches) if err = rflutil.Merge(argv.Interface(), matches, rflutil.SliceAppend(true), rflutil.Tags([]string{"protobuf", "json"})); err != nil {
if err = rflutil.Merge(argv.Interface(), matches, rflutil.SliceAppend(true), rflutil.Tags([]string{"protobuf", "json"})); err != nil { h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest) return
return }
}
b, err := cf.Marshal(argv.Interface())
if err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
return
} }
hr := &rpcRequest{ hr := &rpcRequest{
@ -609,14 +193,14 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
service: handler.sopts.Name, service: handler.sopts.Name,
contentType: ct, contentType: ct,
method: fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name), method: fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name),
endpoint: fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name), body: b,
payload: argv.Interface(), payload: argv.Interface(),
header: md, header: md,
} }
// define the handler func // define the handler func
fn := func(fctx context.Context, req server.Request, rsp interface{}) (err error) { fn := func(fctx context.Context, req server.Request, rsp interface{}) (err error) {
returnValues = function.Call([]reflect.Value{hldr.rcvr, hldr.mtype.prepareContext(fctx), argv, reflect.ValueOf(rsp)}) returnValues = function.Call([]reflect.Value{hldr.rcvr, hldr.mtype.prepareContext(fctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)})
// The return value for the method is an error. // The return value for the method is an error.
if rerr := returnValues[0].Interface(); rerr != nil { if rerr := returnValues[0].Interface(); rerr != nil {
@ -634,18 +218,13 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
metadata.SetOutgoingContext(ctx, md) metadata.SetOutgoingContext(ctx, md)
if err != nil && sp != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
return err return err
} }
h.opts.Hooks.EachNext(func(hook options.Hook) { // wrap the handler func
if h, ok := hook.(server.HookHandler); ok { for i := len(handler.sopts.HdlrWrappers); i > 0; i-- {
fn = h(fn) fn = handler.sopts.HdlrWrappers[i-1](fn)
} }
})
if ct == "application/x-www-form-urlencoded" { if ct == "application/x-www-form-urlencoded" {
cf, err = h.newCodec(DefaultContentType) cf, err = h.newCodec(DefaultContentType)
@ -665,13 +244,6 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set(k, v) w.Header().Set(k, v)
} }
} }
if md := getRspHeader(ctx); md != nil {
for k, v := range md {
for _, vv := range v {
w.Header().Add(k, vv)
}
}
}
if nct := w.Header().Get(metadata.HeaderContentType); nct != ct { if nct := w.Header().Get(metadata.HeaderContentType); nct != ct {
if cf, err = h.newCodec(nct); err != nil { if cf, err = h.newCodec(nct); err != nil {
h.errorHandler(ctx, nil, w, r, err, http.StatusBadRequest) h.errorHandler(ctx, nil, w, r, err, http.StatusBadRequest)
@ -679,23 +251,22 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
} }
var buf []byte
if appErr != nil { if appErr != nil {
switch verr := appErr.(type) { switch verr := appErr.(type) {
case *errors.Error: case *errors.Error:
scode = int(verr.Code) scode = int(verr.Code)
buf, err = cf.Marshal(verr) b, err = cf.Marshal(verr)
case *Error: case *Error:
buf, err = cf.Marshal(verr.err) b, err = cf.Marshal(verr.err)
default: default:
buf, err = cf.Marshal(appErr) b, err = cf.Marshal(appErr)
} }
} else { } else {
buf, err = cf.Marshal(replyv.Interface()) b, err = cf.Marshal(replyv.Interface())
} }
if err != nil && handler.sopts.Logger.V(logger.ErrorLevel) { if err != nil && handler.sopts.Logger.V(logger.ErrorLevel) {
handler.sopts.Logger.Error(handler.sopts.Context, "handler error", err) handler.sopts.Logger.Errorf(handler.sopts.Context, "handler err: %v", err)
return return
} }
@ -704,7 +275,7 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
w.WriteHeader(scode) w.WriteHeader(scode)
if _, cerr := w.Write(buf); cerr != nil { if _, cerr := w.Write(b); cerr != nil {
handler.sopts.Logger.Error(ctx, "respoonse write error", cerr) logger.DefaultLogger.Errorf(ctx, "write failed: %v", cerr)
} }
} }

View File

@ -1,12 +0,0 @@
package handler
import (
// import required packages
_ "go.unistack.org/micro-proto/v3/openapiv3"
)
//go:generate sh -c "curl -L https://github.com/swagger-api/swagger-ui/archive/refs/tags/v4.18.3.zip -o - | bsdtar -C swagger-ui --strip-components=2 -xv swagger-ui-4.18.3/dist && rm swagger-ui/*.map swagger-ui/*-es-*.js swagger-ui/swagger-ui.js swagger-ui/swagger-initializer.js"
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v3) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ ./meter/meter.proto"
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v3) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ ./health/health.proto"

View File

@ -1,77 +0,0 @@
//go:build ignore
package graphql_handler
import (
"context"
"fmt"
"github.com/99designs/gqlgen/graphql"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/store"
)
var _ graphql.Cache = (*cacheWrapper)(nil)
type Handler struct {
opts Options
}
type Option func(*Options)
type Options struct {
cache *cacheWrapper
Path string
}
type cacheWrapper struct {
s store.Store
l logger.Logger
}
func (c *cacheWrapper) Get(ctx context.Context, key string) (interface{}, bool) {
var val interface{}
if err := c.s.Read(ctx, key, val); err != nil && err != store.ErrNotFound {
c.l.Error(ctx, fmt.Sprintf("cache.Get %s failed", key), err)
return nil, false
}
return val, true
}
func (c *cacheWrapper) Add(ctx context.Context, key string, val interface{}) {
if err := c.s.Write(ctx, key, val); err != nil {
c.l.Error(ctx, fmt.Sprintf("cache.Add %s failed", key), err)
}
}
func Store(s store.Store) Option {
return func(o *Options) {
if o.cache == nil {
o.cache = &cacheWrapper{}
}
o.cache.s = s
}
}
func Logger(l logger.Logger) Option {
return func(o *Options) {
if o.cache == nil {
o.cache = &cacheWrapper{}
}
o.cache.l = l
}
}
func Path(path string) Option {
return func(o *Options) {
o.Path = path
}
}
func NewHandler(opts ...Option) *Handler {
options := Options{}
for _, o := range opts {
o(&options)
}
return &Handler{opts: options}
}

View File

@ -1,82 +0,0 @@
package health_handler
import (
"context"
codecpb "go.unistack.org/micro-proto/v3/codec"
"go.unistack.org/micro/v3/errors"
)
var _ HealthServiceServer = &Handler{}
type Handler struct {
opts Options
}
type CheckFunc func(context.Context) error
type Option func(*Options)
type Options struct {
Version string
Name string
LiveChecks []CheckFunc
ReadyChecks []CheckFunc
}
func LiveChecks(fns ...CheckFunc) Option {
return func(o *Options) {
o.LiveChecks = append(o.LiveChecks, fns...)
}
}
func ReadyChecks(fns ...CheckFunc) Option {
return func(o *Options) {
o.ReadyChecks = append(o.ReadyChecks, fns...)
}
}
func Name(name string) Option {
return func(o *Options) {
o.Name = name
}
}
func Version(version string) Option {
return func(o *Options) {
o.Version = version
}
}
func NewHandler(opts ...Option) *Handler {
options := Options{}
for _, o := range opts {
o(&options)
}
return &Handler{opts: options}
}
func (h *Handler) Live(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
var err error
for _, fn := range h.opts.LiveChecks {
if err = fn(ctx); err != nil {
return errors.ServiceUnavailable(h.opts.Name, "%v", err)
}
}
return nil
}
func (h *Handler) Ready(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
var err error
for _, fn := range h.opts.ReadyChecks {
if err = fn(ctx); err != nil {
return errors.ServiceUnavailable(h.opts.Name, "%v", err)
}
}
return nil
}
func (h *Handler) Version(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
rsp.Data = []byte(h.opts.Version)
return nil
}

View File

@ -1,50 +0,0 @@
syntax = "proto3";
package micro.server.http.v3.handler.health;
option go_package = "go.unistack.org/micro-server-http/v3/handler/health;health_handler";
import "api/annotations.proto";
import "openapiv3/annotations.proto";
import "codec/frame.proto";
service HealthService {
rpc Live(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv3.openapiv3_operation) = {
operation_id: "Live";
responses: {
default: {
reference: {
_ref: "micro.codec.Frame";
};
};
};
};
option (micro.api.http) = { get: "/live"; };
};
rpc Ready(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv3.openapiv3_operation) = {
operation_id: "Ready";
responses: {
default: {
reference: {
_ref: "micro.codec.Frame";
};
};
};
};
option (micro.api.http) = { get: "/ready"; };
};
rpc Version(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv3.openapiv3_operation) = {
operation_id: "Version";
responses: {
default: {
reference: {
_ref: "micro.codec.Frame";
};
};
};
};
option (micro.api.http) = { get: "/version"; };
};
};

View File

@ -1,29 +0,0 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// versions:
// - protoc-gen-go-micro v3.10.4
// - protoc v5.26.1
// source: health/health.proto
package health_handler
import (
context "context"
codec "go.unistack.org/micro-proto/v3/codec"
client "go.unistack.org/micro/v3/client"
)
var (
HealthServiceName = "HealthService"
)
type HealthServiceClient interface {
Live(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error)
Ready(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error)
Version(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error)
}
type HealthServiceServer interface {
Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Version(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
}

View File

@ -1,135 +0,0 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// protoc-gen-go-micro version: v3.10.4
// source: health/health.proto
package health_handler
import (
context "context"
v31 "go.unistack.org/micro-client-http/v3"
codec "go.unistack.org/micro-proto/v3/codec"
v3 "go.unistack.org/micro-server-http/v3"
client "go.unistack.org/micro/v3/client"
server "go.unistack.org/micro/v3/server"
http "net/http"
)
var (
HealthServiceServerEndpoints = []v3.EndpointMetadata{
{
Name: "HealthService.Live",
Path: "/live",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Ready",
Path: "/ready",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Version",
Path: "/version",
Method: "GET",
Body: "",
Stream: false,
},
}
)
type healthServiceClient struct {
c client.Client
name string
}
func NewHealthServiceClient(name string, c client.Client) HealthServiceClient {
return &healthServiceClient{c: c, name: name}
}
func (c *healthServiceClient) Live(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error) {
errmap := make(map[string]interface{}, 1)
errmap["default"] = &codec.Frame{}
opts = append(opts,
v31.ErrorMap(errmap),
)
opts = append(opts,
v31.Method(http.MethodGet),
v31.Path("/live"),
)
rsp := &codec.Frame{}
err := c.c.Call(ctx, c.c.NewRequest(c.name, "HealthService.Live", req), rsp, opts...)
if err != nil {
return nil, err
}
return rsp, nil
}
func (c *healthServiceClient) Ready(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error) {
errmap := make(map[string]interface{}, 1)
errmap["default"] = &codec.Frame{}
opts = append(opts,
v31.ErrorMap(errmap),
)
opts = append(opts,
v31.Method(http.MethodGet),
v31.Path("/ready"),
)
rsp := &codec.Frame{}
err := c.c.Call(ctx, c.c.NewRequest(c.name, "HealthService.Ready", req), rsp, opts...)
if err != nil {
return nil, err
}
return rsp, nil
}
func (c *healthServiceClient) Version(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error) {
errmap := make(map[string]interface{}, 1)
errmap["default"] = &codec.Frame{}
opts = append(opts,
v31.ErrorMap(errmap),
)
opts = append(opts,
v31.Method(http.MethodGet),
v31.Path("/version"),
)
rsp := &codec.Frame{}
err := c.c.Call(ctx, c.c.NewRequest(c.name, "HealthService.Version", req), rsp, opts...)
if err != nil {
return nil, err
}
return rsp, nil
}
type healthServiceServer struct {
HealthServiceServer
}
func (h *healthServiceServer) Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
return h.HealthServiceServer.Live(ctx, req, rsp)
}
func (h *healthServiceServer) Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
return h.HealthServiceServer.Ready(ctx, req, rsp)
}
func (h *healthServiceServer) Version(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
return h.HealthServiceServer.Version(ctx, req, rsp)
}
func RegisterHealthServiceServer(s server.Server, sh HealthServiceServer, opts ...server.HandlerOption) error {
type healthService interface {
Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Version(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
}
type HealthService struct {
healthService
}
h := &healthServiceServer{sh}
var nopts []server.HandlerOption
nopts = append(nopts, v3.HandlerEndpoints(HealthServiceServerEndpoints))
return s.Handle(s.NewHandler(&HealthService{h}, append(nopts, opts...)...))
}

View File

@ -1,132 +0,0 @@
package meter_handler
import (
"bytes"
"compress/gzip"
"context"
"io"
"strings"
"sync"
codecpb "go.unistack.org/micro-proto/v3/codec"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/meter"
)
const (
contentEncodingHeader = "Content-Encoding"
acceptEncodingHeader = "Accept-Encoding"
)
var gzipPool = sync.Pool{
New: func() interface{} {
return gzip.NewWriter(nil)
},
}
var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
}
// guard to fail early
var _ MeterServiceServer = &Handler{}
type Handler struct {
opts Options
}
type Option func(*Options)
type Options struct {
Meter meter.Meter
Name string
MeterOptions []meter.Option
DisableCompress bool
}
func Meter(m meter.Meter) Option {
return func(o *Options) {
o.Meter = m
}
}
func Name(name string) Option {
return func(o *Options) {
o.Name = name
}
}
func DisableCompress(g bool) Option {
return func(o *Options) {
o.DisableCompress = g
}
}
func MeterOptions(opts ...meter.Option) Option {
return func(o *Options) {
o.MeterOptions = append(o.MeterOptions, opts...)
}
}
func NewOptions(opts ...Option) Options {
options := Options{Meter: meter.DefaultMeter, DisableCompress: false}
for _, o := range opts {
o(&options)
}
return options
}
func NewHandler(opts ...Option) *Handler {
options := NewOptions(opts...)
return &Handler{opts: options}
}
func (h *Handler) Metrics(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
log, ok := logger.FromContext(ctx)
if !ok {
log = logger.DefaultLogger
}
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
w := io.Writer(buf)
if md, ok := metadata.FromOutgoingContext(ctx); gzipAccepted(md) && ok && !h.opts.DisableCompress {
omd, _ := metadata.FromOutgoingContext(ctx)
omd.Set(contentEncodingHeader, "gzip")
gz := gzipPool.Get().(*gzip.Writer)
defer gzipPool.Put(gz)
gz.Reset(w)
defer gz.Close()
w = gz
gz.Flush()
}
if err := h.opts.Meter.Write(w, h.opts.MeterOptions...); err != nil {
log.Error(ctx, "http/meter write failed", err)
return nil
}
rsp.Data = buf.Bytes()
return nil
}
// gzipAccepted returns whether the client will accept gzip-encoded content.
func gzipAccepted(md metadata.Metadata) bool {
a, ok := md.Get(acceptEncodingHeader)
if !ok {
return false
}
if strings.Contains(a, "gzip") {
return true
}
return false
}

View File

@ -1,24 +0,0 @@
syntax = "proto3";
package micro.server.http.v3.handler.meter;
option go_package = "go.unistack.org/micro-server-http/v3/handler/meter;meter_handler";
import "api/annotations.proto";
import "openapiv3/annotations.proto";
import "codec/frame.proto";
service MeterService {
rpc Metrics(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv3.openapiv3_operation) = {
operation_id: "Metrics";
responses: {
default: {
reference: {
_ref: "micro.codec.Frame";
};
};
};
};
option (micro.api.http) = { get: "/metrics"; };
};
};

View File

@ -1,25 +0,0 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// versions:
// - protoc-gen-go-micro v3.10.4
// - protoc v5.26.1
// source: meter/meter.proto
package meter_handler
import (
context "context"
codec "go.unistack.org/micro-proto/v3/codec"
client "go.unistack.org/micro/v3/client"
)
var (
MeterServiceName = "MeterService"
)
type MeterServiceClient interface {
Metrics(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error)
}
type MeterServiceServer interface {
Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
}

View File

@ -1,75 +0,0 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// protoc-gen-go-micro version: v3.10.4
// source: meter/meter.proto
package meter_handler
import (
context "context"
v31 "go.unistack.org/micro-client-http/v3"
codec "go.unistack.org/micro-proto/v3/codec"
v3 "go.unistack.org/micro-server-http/v3"
client "go.unistack.org/micro/v3/client"
server "go.unistack.org/micro/v3/server"
http "net/http"
)
var (
MeterServiceServerEndpoints = []v3.EndpointMetadata{
{
Name: "MeterService.Metrics",
Path: "/metrics",
Method: "GET",
Body: "",
Stream: false,
},
}
)
type meterServiceClient struct {
c client.Client
name string
}
func NewMeterServiceClient(name string, c client.Client) MeterServiceClient {
return &meterServiceClient{c: c, name: name}
}
func (c *meterServiceClient) Metrics(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error) {
errmap := make(map[string]interface{}, 1)
errmap["default"] = &codec.Frame{}
opts = append(opts,
v31.ErrorMap(errmap),
)
opts = append(opts,
v31.Method(http.MethodGet),
v31.Path("/metrics"),
)
rsp := &codec.Frame{}
err := c.c.Call(ctx, c.c.NewRequest(c.name, "MeterService.Metrics", req), rsp, opts...)
if err != nil {
return nil, err
}
return rsp, nil
}
type meterServiceServer struct {
MeterServiceServer
}
func (h *meterServiceServer) Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
return h.MeterServiceServer.Metrics(ctx, req, rsp)
}
func RegisterMeterServiceServer(s server.Server, sh MeterServiceServer, opts ...server.HandlerOption) error {
type meterService interface {
Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
}
type MeterService struct {
meterService
}
h := &meterServiceServer{sh}
var nopts []server.HandlerOption
nopts = append(nopts, v3.HandlerEndpoints(MeterServiceServerEndpoints))
return s.Handle(s.NewHandler(&MeterService{h}, append(nopts, opts...)...))
}

View File

@ -1,46 +0,0 @@
package pprof_handler
import (
"expvar"
"net/http"
"net/http/pprof"
"path"
"strings"
)
func NewHandler(prefixPath string, initFuncs ...func()) http.HandlerFunc {
for _, fn := range initFuncs {
fn()
}
return func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.EqualFold(r.RequestURI, prefixPath) && r.RequestURI[len(r.RequestURI)-1] != '/':
http.Redirect(w, r, r.RequestURI+"/", http.StatusMovedPermanently)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "cmdline")):
pprof.Cmdline(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "profile")):
pprof.Profile(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "symbol")):
pprof.Symbol(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "trace")):
pprof.Trace(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "goroutine")):
pprof.Handler("goroutine").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "threadcreate")):
pprof.Handler("threadcreate").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "mutex")):
pprof.Handler("mutex").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "heap")):
pprof.Handler("heap").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "block")):
pprof.Handler("block").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "allocs")):
pprof.Handler("allocs").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "vars")):
expvar.Handler().ServeHTTP(w, r)
default:
pprof.Index(w, r)
}
}
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 665 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 628 B

View File

@ -1,16 +0,0 @@
html {
box-sizing: border-box;
overflow: -moz-scrollbars-vertical;
overflow-y: scroll;
}
*,
*:before,
*:after {
box-sizing: inherit;
}
body {
margin: 0;
background: #fafafa;
}

View File

@ -1,19 +0,0 @@
<!-- HTML for static distribution bundle build -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Swagger UI</title>
<link rel="stylesheet" type="text/css" href="./swagger-ui.css" />
<link rel="stylesheet" type="text/css" href="index.css" />
<link rel="icon" type="image/png" href="./favicon-32x32.png" sizes="32x32" />
<link rel="icon" type="image/png" href="./favicon-16x16.png" sizes="16x16" />
</head>
<body>
<div id="swagger-ui"></div>
<script src="./swagger-ui-bundle.js" charset="UTF-8"> </script>
<script src="./swagger-ui-standalone-preset.js" charset="UTF-8"> </script>
<script src="./swagger-initializer.js" charset="UTF-8"> </script>
</body>
</html>

View File

@ -1,79 +0,0 @@
<!doctype html>
<html lang="en-US">
<head>
<title>Swagger UI: OAuth2 Redirect</title>
</head>
<body>
<script>
'use strict';
function run () {
var oauth2 = window.opener.swaggerUIRedirectOauth2;
var sentState = oauth2.state;
var redirectUrl = oauth2.redirectUrl;
var isValid, qp, arr;
if (/code|token|error/.test(window.location.hash)) {
qp = window.location.hash.substring(1).replace('?', '&');
} else {
qp = location.search.substring(1);
}
arr = qp.split("&");
arr.forEach(function (v,i,_arr) { _arr[i] = '"' + v.replace('=', '":"') + '"';});
qp = qp ? JSON.parse('{' + arr.join() + '}',
function (key, value) {
return key === "" ? value : decodeURIComponent(value);
}
) : {};
isValid = qp.state === sentState;
if ((
oauth2.auth.schema.get("flow") === "accessCode" ||
oauth2.auth.schema.get("flow") === "authorizationCode" ||
oauth2.auth.schema.get("flow") === "authorization_code"
) && !oauth2.auth.code) {
if (!isValid) {
oauth2.errCb({
authId: oauth2.auth.name,
source: "auth",
level: "warning",
message: "Authorization may be unsafe, passed state was changed in server. The passed state wasn't returned from auth server."
});
}
if (qp.code) {
delete oauth2.state;
oauth2.auth.code = qp.code;
oauth2.callback({auth: oauth2.auth, redirectUrl: redirectUrl});
} else {
let oauthErrorMsg;
if (qp.error) {
oauthErrorMsg = "["+qp.error+"]: " +
(qp.error_description ? qp.error_description+ ". " : "no accessCode received from the server. ") +
(qp.error_uri ? "More info: "+qp.error_uri : "");
}
oauth2.errCb({
authId: oauth2.auth.name,
source: "auth",
level: "error",
message: oauthErrorMsg || "[Authorization failed]: no accessCode received from the server."
});
}
} else {
oauth2.callback({auth: oauth2.auth, token: qp, isValid: isValid, redirectUrl: redirectUrl});
}
window.close();
}
if (document.readyState !== 'loading') {
run();
} else {
document.addEventListener('DOMContentLoaded', function () {
run();
});
}
</script>
</body>
</html>

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,142 +0,0 @@
package swaggerui_handler
import (
"embed"
"html/template"
"net/http"
"path"
"reflect"
)
//go:embed *.js *.css *.html *.png
var assets embed.FS
var (
Handler = func(prefix string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet || path.Base(r.URL.Path) != "swagger-initializer.js" {
http.StripPrefix(prefix, http.FileServer(http.FS(assets))).ServeHTTP(w, r)
return
}
tpl := template.New("swagger-initializer.js").Funcs(TemplateFuncs)
ptpl, err := tpl.Parse(Template)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if err := ptpl.Execute(w, Config); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
}
}
TemplateFuncs = template.FuncMap{
"isInt": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Int, reflect.Int8, reflect.Int32, reflect.Int64, reflect.Uint, reflect.Uint8, reflect.Uint32, reflect.Uint64, reflect.Float32, reflect.Float64:
return true
default:
return false
}
},
"isBool": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Bool:
return true
default:
return false
}
},
"isString": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.String:
return true
default:
return false
}
},
"isSlice": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Slice:
return true
default:
return false
}
},
"isMap": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Map:
return true
default:
return false
}
},
}
Template = `
window.onload = function() {
//<editor-fold desc="Changeable Configuration Block">
window.ui = SwaggerUIBundle({
{{- range $k, $v := . }}
{{- if (eq (printf "%s" $v) "") -}}
{{- continue -}}
{{ end }}
{{ $k }}: {{ if isBool $v -}}
{{- $v -}},
{{- else if isInt $v -}}
{{- $v -}},
{{- else if isString $v -}}
"{{- $v -}}",
{{- else if and (isSlice $v) (or (eq (printf "%s" $k) "presets") (eq (printf "%s" $k) "plugins")) -}}
[
{{- range $v }}
{{ . }},
{{- end }}
],
{{- end -}}
{{ end }}
});
//</editor-fold>
};`
Config = map[string]interface{}{
"configUrl": "",
"dom_id": "#swagger-ui",
/*
"domNode": "",
"spec": "",
"urls": []interface{}{
map[string]interface{}{
"url": "",
"name": "",
},
},
},
*/
"url": "https://petstore.swagger.io/v2/swagger.json",
"deepLinking": true,
"displayOperationId": false,
"defaultModelsExpandDepth": 1,
"defaultModelExpandDepth": 1,
"displayRequestDuration": true,
"filter": true,
"operationsSorter": "alpha",
"showExtensions": true,
"tryItOutEnabled": true,
"presets": []string{
"SwaggerUIBundle.presets.apis",
"SwaggerUIStandalonePreset",
},
"plugins": []string{
"SwaggerUIBundle.plugins.DownloadUrl",
},
"layout": "StandaloneLayout",
}
)

View File

@ -1,15 +0,0 @@
package swaggerui_handler
import (
"net/http"
"testing"
)
func TestTemplate(t *testing.T) {
t.Skip()
h := http.NewServeMux()
h.HandleFunc("/", Handler(""))
if err := http.ListenAndServe(":8080", h); err != nil {
t.Fatal(err)
}
}

View File

@ -1,61 +0,0 @@
package swagger_handler
import (
"io/fs"
"net/http"
yamlcodec "go.unistack.org/micro-codec-yaml/v3"
rutil "go.unistack.org/micro/v3/util/reflect"
)
// Handler append to generated swagger data from dst map[string]interface{}
var Handler = func(dst map[string]interface{}, fsys fs.FS) http.HandlerFunc {
c := yamlcodec.NewCodec()
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotFound)
return
}
path := r.URL.Path
if len(path) > 1 && path[0] == '/' {
path = path[1:]
}
buf, err := fs.ReadFile(fsys, path)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if dst == nil {
w.WriteHeader(http.StatusOK)
_, _ = w.Write(buf)
return
}
var src interface{}
if err = c.Unmarshal(buf, src); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if err = rutil.Merge(src, dst); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if buf, err = c.Marshal(src); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write(buf)
}
}

301
http.go
View File

@ -1,10 +1,9 @@
// Package http implements a go-micro.Server // Package http implements a go-micro.Server
package http // import "go.unistack.org/micro-server-http/v3" package http
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"errors"
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
@ -23,24 +22,35 @@ import (
"golang.org/x/net/netutil" "golang.org/x/net/netutil"
) )
var _ server.Server = (*Server)(nil) var httpAllMethods = []string{
http.MethodConnect,
http.MethodDelete,
http.MethodGet,
http.MethodHead,
http.MethodOptions,
http.MethodPatch,
http.MethodPost,
http.MethodPut,
http.MethodTrace,
}
type Server struct { type httpServer struct {
hd server.Handler hd server.Handler
rsvc *register.Service rsvc *register.Service
handlers map[string]server.Handler handlers map[string]server.Handler
exit chan chan error exit chan chan error
subscribers map[*httpSubscriber][]broker.Subscriber subscribers map[*httpSubscriber][]broker.Subscriber
errorHandler func(context.Context, server.Handler, http.ResponseWriter, *http.Request, error, int) errorHandler func(context.Context, server.Handler, http.ResponseWriter, *http.Request, error, int)
pathHandlers *rhttp.Trie pathHandlers *rhttp.Trie
opts server.Options contentTypeHandlers map[string]http.HandlerFunc
registerRPC bool opts server.Options
registerRPC bool
sync.RWMutex sync.RWMutex
registered bool registered bool
init bool init bool
} }
func (h *Server) newCodec(ct string) (codec.Codec, error) { func (h *httpServer) newCodec(ct string) (codec.Codec, error) {
if idx := strings.IndexRune(ct, ';'); idx >= 0 { if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx] ct = ct[:idx]
} }
@ -53,14 +63,14 @@ func (h *Server) newCodec(ct string) (codec.Codec, error) {
return nil, codec.ErrUnknownContentType return nil, codec.ErrUnknownContentType
} }
func (h *Server) Options() server.Options { func (h *httpServer) Options() server.Options {
h.Lock() h.Lock()
opts := h.opts opts := h.opts
h.Unlock() h.Unlock()
return opts return opts
} }
func (h *Server) Init(opts ...server.Option) error { func (h *httpServer) Init(opts ...server.Option) error {
if len(opts) == 0 && h.init { if len(opts) == 0 && h.init {
return nil return nil
} }
@ -79,19 +89,22 @@ func (h *Server) Init(opts ...server.Option) error {
if h.pathHandlers == nil { if h.pathHandlers == nil {
h.pathHandlers = rhttp.NewTrie() h.pathHandlers = rhttp.NewTrie()
} }
if h.contentTypeHandlers == nil {
h.contentTypeHandlers = make(map[string]http.HandlerFunc)
}
if v, ok := h.opts.Context.Value(registerRPCHandlerKey{}).(bool); ok { if v, ok := h.opts.Context.Value(registerRPCHandlerKey{}).(bool); ok {
h.registerRPC = v h.registerRPC = v
} }
if phs, ok := h.opts.Context.Value(pathHandlerKey{}).(*pathHandlerVal); ok && phs.h != nil { if phs, ok := h.opts.Context.Value(pathHandlerKey{}).(*pathHandlerVal); ok && phs.h != nil {
for pm, ps := range phs.h { for pp, ph := range phs.h {
for pp, ph := range ps { h.pathHandlers.Insert(httpAllMethods, pp, ph)
if err := h.pathHandlers.Insert([]string{pm}, pp, ph); err != nil { }
h.Unlock() }
return err if phs, ok := h.opts.Context.Value(contentTypeHandlerKey{}).(*contentTypeHandlerVal); ok && phs.h != nil {
} for pp, ph := range phs.h {
} h.contentTypeHandlers[pp] = ph
} }
} }
h.Unlock() h.Unlock()
@ -109,6 +122,10 @@ func (h *Server) Init(opts ...server.Option) error {
h.RUnlock() h.RUnlock()
return err return err
} }
if err := h.opts.Auth.Init(); err != nil {
h.RUnlock()
return err
}
if err := h.opts.Logger.Init(); err != nil { if err := h.opts.Logger.Init(); err != nil {
h.RUnlock() h.RUnlock()
return err return err
@ -130,8 +147,7 @@ func (h *Server) Init(opts ...server.Option) error {
return nil return nil
} }
func (h *Server) Handle(handler server.Handler) error { func (h *httpServer) Handle(handler server.Handler) error {
// passed unknown handler
hdlr, ok := handler.(*httpHandler) hdlr, ok := handler.(*httpHandler)
if !ok { if !ok {
h.Lock() h.Lock()
@ -140,7 +156,6 @@ func (h *Server) Handle(handler server.Handler) error {
return nil return nil
} }
// passed http.Handler like some muxer
if _, ok := hdlr.hd.(http.Handler); ok { if _, ok := hdlr.hd.(http.Handler); ok {
h.Lock() h.Lock()
h.hd = handler h.hd = handler
@ -148,7 +163,6 @@ func (h *Server) Handle(handler server.Handler) error {
return nil return nil
} }
// passed micro compat handler
h.Lock() h.Lock()
if h.handlers == nil { if h.handlers == nil {
h.handlers = make(map[string]server.Handler) h.handlers = make(map[string]server.Handler)
@ -159,7 +173,7 @@ func (h *Server) Handle(handler server.Handler) error {
return nil return nil
} }
func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler { func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
options := server.NewHandlerOptions(opts...) options := server.NewHandlerOptions(opts...)
eps := make([]*register.Endpoint, 0, len(options.Metadata)) eps := make([]*register.Endpoint, 0, len(options.Metadata))
@ -171,32 +185,20 @@ func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) s
} }
hdlr := &httpHandler{ hdlr := &httpHandler{
eps: eps, eps: eps,
hd: handler, hd: handler,
opts: options, opts: options,
sopts: h.opts, sopts: h.opts,
handlers: rhttp.NewTrie(),
} }
tp := reflect.TypeOf(handler) tp := reflect.TypeOf(handler)
type nilHandler struct{}
/* hdlr.handlers = make(map[string][]patHandler)
if len(options.Metadata) == 0 {
if h.registerRPC {
h.opts.Logger.Infof(h.opts.Context, "register rpc handler for http.MethodPost %s /%s", hn, hn)
if err := hdlr.handlers.Insert([]string{http.MethodPost}, "/"+hn, pth); err != nil {
h.opts.Logger.Errorf(h.opts.Context, "cant add rpc handler for http.MethodPost %s /%s", hn, hn)
}
}
}
*/
registerCORS := false
if v, ok := options.Context.Value(registerCORSHandlerKey{}).(bool); ok && v {
registerCORS = true
}
for hn, md := range options.Metadata { for hn, md := range options.Metadata {
pat := rhttp.NewTrie()
pat.Insert([]string{md["Method"]}, md["Path"], &nilHandler{})
var method reflect.Method var method reflect.Method
mname := hn[strings.Index(hn, ".")+1:] mname := hn[strings.Index(hn, ".")+1:]
for m := 0; m < tp.NumMethod(); m++ { for m := 0; m < tp.NumMethod(); m++ {
@ -209,7 +211,7 @@ func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) s
} }
if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) { if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil method for %s", mname)) h.opts.Logger.Errorf(h.opts.Context, "nil method for %s", mname)
continue continue
} }
@ -218,105 +220,35 @@ func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) s
h.opts.Logger.Errorf(h.opts.Context, "%v", err) h.opts.Logger.Errorf(h.opts.Context, "%v", err)
continue continue
} else if mtype == nil { } else if mtype == nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil mtype for %s", mname))
continue continue
} }
rcvr := reflect.ValueOf(handler) rcvr := reflect.ValueOf(handler)
name := reflect.Indirect(rcvr).Type().Name() name := reflect.Indirect(rcvr).Type().Name()
pth := &patHandler{mtype: mtype, name: name, rcvr: rcvr} pth := patHandler{pat: pat, mtype: mtype, name: name, rcvr: rcvr}
hdlr.name = name hdlr.name = name
hdlr.handlers[md["Method"]] = append(hdlr.handlers[md["Method"]], pth)
methods := []string{md["Method"]} if !h.registerRPC {
if registerCORS {
methods = append(methods, http.MethodOptions)
}
if err := hdlr.handlers.Insert(methods, md["Path"], pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add handler for %v %s", methods, md["Path"]))
}
if h.registerRPC {
methods := []string{http.MethodPost}
if registerCORS {
methods = append(methods, http.MethodOptions)
}
if err := hdlr.handlers.Insert(methods, "/"+hn, pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add rpc handler for http.MethodPost %s /%s", hn, hn))
}
}
}
metadata, ok := options.Context.Value(handlerEndpointsKey{}).([]EndpointMetadata)
if !ok {
return hdlr
}
for _, md := range metadata {
hn := md.Name
var method reflect.Method
mname := hn[strings.Index(hn, ".")+1:]
for m := 0; m < tp.NumMethod(); m++ {
mn := tp.Method(m)
if mn.Name != mname {
continue
}
method = mn
break
}
if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil method for %s", mname))
continue continue
} }
mtype, err := prepareEndpoint(method) rpat := rhttp.NewTrie()
if err != nil && h.opts.Logger.V(logger.ErrorLevel) { rpat.Insert([]string{http.MethodPost}, "/"+hn, &nilHandler{})
h.opts.Logger.Errorf(h.opts.Context, "%v", err)
continue
} else if mtype == nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil mtype for %s", mname))
continue
}
rcvr := reflect.ValueOf(handler) pth = patHandler{pat: rpat, mtype: mtype, name: name, rcvr: rcvr}
name := reflect.Indirect(rcvr).Type().Name() hdlr.handlers[http.MethodPost] = append(hdlr.handlers[http.MethodPost], pth)
pth := &patHandler{mtype: mtype, name: name, rcvr: rcvr}
hdlr.name = name
methods := []string{md.Method}
if registerCORS {
methods = append(methods, http.MethodOptions)
}
if err := hdlr.handlers.Insert(methods, md.Path, pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add handler for %s %s", md.Method, md.Path))
}
if h.registerRPC {
methods := []string{http.MethodPost}
if registerCORS {
methods = append(methods, http.MethodOptions)
}
h.opts.Logger.Info(h.opts.Context, fmt.Sprintf("register rpc handler for http.MethodPost %s /%s", hn, hn))
if err := hdlr.handlers.Insert(methods, "/"+hn, pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add rpc handler for http.MethodPost %s /%s", hn, hn))
}
}
} }
return hdlr return hdlr
} }
func (h *Server) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber { func (h *httpServer) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
return newSubscriber(topic, handler, opts...) return newSubscriber(topic, handler, opts...)
} }
func (h *Server) Subscribe(sb server.Subscriber) error { func (h *httpServer) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*httpSubscriber) sub, ok := sb.(*httpSubscriber)
if !ok { if !ok {
return fmt.Errorf("invalid subscriber: expected *httpSubscriber") return fmt.Errorf("invalid subscriber: expected *httpSubscriber")
@ -341,7 +273,7 @@ func (h *Server) Subscribe(sb server.Subscriber) error {
return nil return nil
} }
func (h *Server) Register() error { func (h *httpServer) Register() error {
var eps []*register.Endpoint var eps []*register.Endpoint
h.RLock() h.RLock()
for _, hdlr := range h.handlers { for _, hdlr := range h.handlers {
@ -387,7 +319,7 @@ func (h *Server) Register() error {
if !registered { if !registered {
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, fmt.Sprintf("Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)) config.Logger.Infof(config.Context, "Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)
} }
} }
@ -402,17 +334,6 @@ func (h *Server) Register() error {
} }
h.Lock() h.Lock()
h.registered = true
h.rsvc = service
h.Unlock()
return nil
}
func (h *Server) subscribe() error {
config := h.opts
for sb := range h.subscribers { for sb := range h.subscribers {
handler := h.createSubHandler(sb, config) handler := h.createSubHandler(sb, config)
var opts []broker.SubscribeOption var opts []broker.SubscribeOption
@ -426,7 +347,6 @@ func (h *Server) subscribe() error {
} }
opts = append(opts, broker.SubscribeContext(subCtx)) opts = append(opts, broker.SubscribeContext(subCtx))
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck)) opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...) sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
if err != nil { if err != nil {
@ -436,10 +356,14 @@ func (h *Server) subscribe() error {
h.subscribers[sb] = []broker.Subscriber{sub} h.subscribers[sb] = []broker.Subscriber{sub}
} }
h.registered = true
h.rsvc = service
h.Unlock()
return nil return nil
} }
func (h *Server) Deregister() error { func (h *httpServer) Deregister() error {
h.RLock() h.RLock()
config := h.opts config := h.opts
h.RUnlock() h.RUnlock()
@ -450,7 +374,7 @@ func (h *Server) Deregister() error {
} }
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, "Deregistering node: "+service.Nodes[0].ID) config.Logger.Infof(config.Context, "Deregistering node: %s", service.Nodes[0].ID)
} }
if err := server.DefaultDeregisterFunc(service, config); err != nil { if err := server.DefaultDeregisterFunc(service, config); err != nil {
@ -474,10 +398,10 @@ func (h *Server) Deregister() error {
} }
for _, sub := range subs { for _, sub := range subs {
config.Logger.Info(config.Context, "Unsubscribing from topic: "+sub.Topic()) config.Logger.Infof(config.Context, "Unsubscribing from topic: %s", sub.Topic())
if err := sub.Unsubscribe(subCtx); err != nil { if err := sub.Unsubscribe(subCtx); err != nil {
h.Unlock() h.Unlock()
config.Logger.Error(config.Context, fmt.Sprintf("failed to unsubscribe topic: %s, error", sb.Topic()), err) config.Logger.Errorf(config.Context, "failed to unsubscribe topic: %s, error: %v", sb.Topic(), err)
return err return err
} }
} }
@ -487,7 +411,7 @@ func (h *Server) Deregister() error {
return nil return nil
} }
func (h *Server) Start() error { func (h *httpServer) Start() error {
h.RLock() h.RLock()
config := h.opts config := h.opts
h.RUnlock() h.RUnlock()
@ -517,7 +441,7 @@ func (h *Server) Start() error {
} }
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, "Listening on "+ts.Addr().String()) config.Logger.Infof(config.Context, "Listening on %s", ts.Addr().String())
} }
h.Lock() h.Lock()
@ -525,6 +449,7 @@ func (h *Server) Start() error {
h.Unlock() h.Unlock()
var handler http.Handler var handler http.Handler
var srvFunc func(net.Listener) error
// nolint: nestif // nolint: nestif
if h.opts.Context != nil { if h.opts.Context != nil {
@ -540,12 +465,9 @@ func (h *Server) Start() error {
} }
} }
switch { if handler == nil && h.hd == nil {
case handler == nil && h.hd == nil:
handler = h handler = h
case len(h.handlers) > 0 && h.hd != nil: } else if handler == nil && h.hd != nil {
handler = h
case handler == nil && h.hd != nil:
if hdlr, ok := h.hd.Handler().(http.Handler); ok { if hdlr, ok := h.hd.Handler().(http.Handler); ok {
handler = hdlr handler = hdlr
} }
@ -561,7 +483,7 @@ func (h *Server) Start() error {
if err := config.RegisterCheck(h.opts.Context); err != nil { if err := config.RegisterCheck(h.opts.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error", config.Name, config.ID), err) config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, err)
} }
} else { } else {
if err = h.Register(); err != nil { if err = h.Register(); err != nil {
@ -569,13 +491,8 @@ func (h *Server) Start() error {
} }
} }
if err := h.subscribe(); err != nil {
return err
}
fn := handler fn := handler
var hs *http.Server
if h.opts.Context != nil { if h.opts.Context != nil {
if mwf, ok := h.opts.Context.Value(middlewareKey{}).([]func(http.Handler) http.Handler); ok && len(mwf) > 0 { if mwf, ok := h.opts.Context.Value(middlewareKey{}).([]func(http.Handler) http.Handler); ok && len(mwf) > 0 {
// wrap the handler func // wrap the handler func
@ -583,19 +500,25 @@ func (h *Server) Start() error {
fn = mwf[i-1](fn) fn = mwf[i-1](fn)
} }
} }
var ok bool if hs, ok := h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil {
if hs, ok = h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil {
hs.Handler = fn hs.Handler = fn
} else { srvFunc = hs.Serve
hs = &http.Server{Handler: fn}
} }
} }
go func() { if srvFunc != nil {
if cerr := hs.Serve(ts); cerr != nil && !errors.Is(cerr, net.ErrClosed) { go func() {
h.opts.Logger.Error(h.opts.Context, "serve error", cerr) if cerr := srvFunc(ts); cerr != nil && !strings.Contains(cerr.Error(), "use of closed network connection") {
} h.opts.Logger.Error(h.opts.Context, cerr)
}() }
}()
} else {
go func() {
if cerr := http.Serve(ts, fn); cerr != nil && !strings.Contains(cerr.Error(), "use of closed network connection") {
h.opts.Logger.Error(h.opts.Context, cerr)
}
}()
}
go func() { go func() {
t := new(time.Ticker) t := new(time.Ticker)
@ -621,28 +544,28 @@ func (h *Server) Start() error {
// nolint: nestif // nolint: nestif
if rerr != nil && registered { if rerr != nil && registered {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error, deregister it", config.Name, config.ID), rerr) config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)
} }
// deregister self in case of error // deregister self in case of error
if err := h.Deregister(); err != nil { if err := h.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s deregister error", config.Name, config.ID), err) config.Logger.Errorf(config.Context, "Server %s-%s deregister error: %s", config.Name, config.ID, err)
} }
} }
} else if rerr != nil && !registered { } else if rerr != nil && !registered {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error", config.Name, config.ID), rerr) config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, rerr)
} }
continue continue
} }
if err := h.Register(); err != nil { if err := h.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register error", config.Name, config.ID), err) config.Logger.Errorf(config.Context, "Server %s-%s register error: %s", config.Name, config.ID, err)
} }
} }
if err := h.Register(); err != nil { if err := h.Register(); err != nil {
config.Logger.Error(config.Context, "Server register error", err) config.Logger.Errorf(config.Context, "Server register error: %s", err)
} }
// wait for exit // wait for exit
case ch = <-h.exit: case ch = <-h.exit:
@ -652,52 +575,40 @@ func (h *Server) Start() error {
// deregister // deregister
if err := h.Deregister(); err != nil { if err := h.Deregister(); err != nil {
config.Logger.Error(config.Context, "Server deregister error", err) config.Logger.Errorf(config.Context, "Server deregister error: %s", err)
} }
if err := config.Broker.Disconnect(config.Context); err != nil { if err := config.Broker.Disconnect(config.Context); err != nil {
config.Logger.Error(config.Context, "Broker disconnect error", err) config.Logger.Errorf(config.Context, "Broker disconnect error: %s", err)
} }
ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout) ch <- ts.Close()
defer cancel()
err := hs.Shutdown(ctx)
if err != nil {
err = hs.Close()
}
ch <- err
}() }()
return nil return nil
} }
func (h *Server) Stop() error { func (h *httpServer) Stop() error {
ch := make(chan error) ch := make(chan error)
h.exit <- ch h.exit <- ch
return <-ch return <-ch
} }
func (h *Server) String() string { func (h *httpServer) String() string {
return "http" return "http"
} }
func (h *Server) Name() string { func (h *httpServer) Name() string {
return h.opts.Name return h.opts.Name
} }
func NewServer(opts ...server.Option) *Server { func NewServer(opts ...server.Option) server.Server {
options := server.NewOptions(opts...) options := server.NewOptions(opts...)
eh := DefaultErrorHandler return &httpServer{
if v, ok := options.Context.Value(errorHandlerKey{}).(errorHandler); ok && v != nil {
eh = v
}
return &Server{
opts: options, opts: options,
exit: make(chan chan error), exit: make(chan chan error),
subscribers: make(map[*httpSubscriber][]broker.Subscriber), subscribers: make(map[*httpSubscriber][]broker.Subscriber),
errorHandler: eh, errorHandler: DefaultErrorHandler,
pathHandlers: rhttp.NewTrie(), pathHandlers: rhttp.NewTrie(),
} }
} }

View File

@ -11,12 +11,17 @@ type httpMessage struct {
header metadata.Metadata header metadata.Metadata
topic string topic string
contentType string contentType string
body []byte
} }
func (r *httpMessage) Topic() string { func (r *httpMessage) Topic() string {
return r.topic return r.topic
} }
func (r *httpMessage) Payload() interface{} {
return r.payload
}
func (r *httpMessage) ContentType() string { func (r *httpMessage) ContentType() string {
return r.contentType return r.contentType
} }
@ -25,8 +30,8 @@ func (r *httpMessage) Header() metadata.Metadata {
return r.header return r.header
} }
func (r *httpMessage) Body() interface{} { func (r *httpMessage) Body() []byte {
return r.payload return r.body
} }
func (r *httpMessage) Codec() codec.Codec { func (r *httpMessage) Codec() codec.Codec {

View File

@ -38,20 +38,6 @@ type (
} }
) )
type (
rspHeaderKey struct{}
rspHeaderVal struct {
h http.Header
}
)
// SetRspHeader add response headers
func SetRspHeader(ctx context.Context, h http.Header) {
if rsp, ok := ctx.Value(rspHeaderKey{}).(*rspHeaderVal); ok {
rsp.h = h
}
}
// SetRspCode saves response code in context, must be used by handler to specify http code // SetRspCode saves response code in context, must be used by handler to specify http code
func SetRspCode(ctx context.Context, code int) { func SetRspCode(ctx context.Context, code int) {
if rsp, ok := ctx.Value(rspCodeKey{}).(*rspCodeVal); ok { if rsp, ok := ctx.Value(rspCodeKey{}).(*rspCodeVal); ok {
@ -59,17 +45,9 @@ func SetRspCode(ctx context.Context, code int) {
} }
} }
// getRspHeader get http.Header from context
func getRspHeader(ctx context.Context) http.Header {
if rsp, ok := ctx.Value(rspHeaderKey{}).(*rspHeaderVal); ok {
return rsp.h
}
return nil
}
// GetRspCode used internally by generated http server handler // GetRspCode used internally by generated http server handler
func GetRspCode(ctx context.Context) int { func GetRspCode(ctx context.Context) int {
code := int(200) var code int
if rsp, ok := ctx.Value(rspCodeKey{}).(*rspCodeVal); ok { if rsp, ok := ctx.Value(rspCodeKey{}).(*rspCodeVal); ok {
code = rsp.code code = rsp.code
} }
@ -85,47 +63,62 @@ func Middleware(mw ...func(http.Handler) http.Handler) server.Option {
type serverKey struct{} type serverKey struct{}
// HTTPServer provide ability to pass *http.Server // Server provide ability to pass *http.Server
func HTTPServer(hs *http.Server) server.Option { func Server(hs *http.Server) server.Option {
return server.SetOption(serverKey{}, hs) return server.SetOption(serverKey{}, hs)
} }
type errorHandler func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int)
type errorHandlerKey struct{} type errorHandlerKey struct{}
// ErrorHandler specifies handler for errors // ErrorHandler specifies handler for errors
func ErrorHandler(fn errorHandler) server.Option { func ErrorHandler(fn func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int)) server.Option {
return server.SetOption(errorHandlerKey{}, fn) return server.SetOption(errorHandlerKey{}, fn)
} }
type ( type (
pathHandlerKey struct{} pathHandlerKey struct{}
pathHandlerVal struct { pathHandlerVal struct {
h map[string]map[string]http.HandlerFunc h map[string]http.HandlerFunc
} }
) )
// PathHandler specifies http handler for path regexp // PathHandler specifies http handler for path regexp
func PathHandler(method, path string, handler http.HandlerFunc) server.Option { func PathHandler(path string, h http.HandlerFunc) server.Option {
return func(o *server.Options) { return func(o *server.Options) {
if o.Context == nil { if o.Context == nil {
o.Context = context.Background() o.Context = context.Background()
} }
v, ok := o.Context.Value(pathHandlerKey{}).(*pathHandlerVal) v, ok := o.Context.Value(pathHandlerKey{}).(*pathHandlerVal)
if !ok { if !ok {
v = &pathHandlerVal{h: make(map[string]map[string]http.HandlerFunc)} v = &pathHandlerVal{h: make(map[string]http.HandlerFunc)}
} }
m, ok := v.h[method] v.h[path] = h
if !ok {
m = make(map[string]http.HandlerFunc)
v.h[method] = m
}
m[path] = handler
o.Context = context.WithValue(o.Context, pathHandlerKey{}, v) o.Context = context.WithValue(o.Context, pathHandlerKey{}, v)
} }
} }
type (
contentTypeHandlerKey struct{}
contentTypeHandlerVal struct {
h map[string]http.HandlerFunc
}
)
// ContentTypeHandler specifies http handler for Content-Type
func ContentTypeHandler(ct string, h http.HandlerFunc) server.Option {
return func(o *server.Options) {
if o.Context == nil {
o.Context = context.Background()
}
v, ok := o.Context.Value(contentTypeHandlerKey{}).(*contentTypeHandlerVal)
if !ok {
v = &contentTypeHandlerVal{h: make(map[string]http.HandlerFunc)}
}
v.h[ct] = h
o.Context = context.WithValue(o.Context, contentTypeHandlerKey{}, v)
}
}
type registerRPCHandlerKey struct{} type registerRPCHandlerKey struct{}
// RegisterRPCHandler registers compatibility endpoints with /ServiceName.ServiceEndpoint method POST // RegisterRPCHandler registers compatibility endpoints with /ServiceName.ServiceEndpoint method POST
@ -133,26 +126,7 @@ func RegisterRPCHandler(b bool) server.Option {
return server.SetOption(registerRPCHandlerKey{}, b) return server.SetOption(registerRPCHandlerKey{}, b)
} }
type registerCORSHandlerKey struct{} type headerKey struct{}
// RegisterCORSHandler registers cors endpoints with /ServiceName.ServiceEndpoint method POPTIONSOST
func RegisterCORSHandler(b bool) server.HandlerOption {
return server.SetHandlerOption(registerCORSHandlerKey{}, b)
}
type handlerEndpointsKey struct{}
type EndpointMetadata struct {
Name string
Path string
Method string
Body string
Stream bool
}
func HandlerEndpoints(md []EndpointMetadata) server.HandlerOption {
return server.SetHandlerOption(handlerEndpointsKey{}, md)
}
type handlerOptions struct { type handlerOptions struct {
headers []string headers []string

View File

@ -22,6 +22,7 @@ type rpcRequest struct {
endpoint string endpoint string
contentType string contentType string
service string service string
body []byte
stream bool stream bool
} }
@ -31,6 +32,7 @@ type rpcMessage struct {
header metadata.Metadata header metadata.Metadata
topic string topic string
contentType string contentType string
body []byte
} }
func (r *rpcRequest) ContentType() string { func (r *rpcRequest) ContentType() string {
@ -58,7 +60,11 @@ func (r *rpcRequest) Header() metadata.Metadata {
} }
func (r *rpcRequest) Read() ([]byte, error) { func (r *rpcRequest) Read() ([]byte, error) {
return nil, nil f := &codec.Frame{}
if err := r.codec.ReadBody(r.rw, f); err != nil {
return nil, err
}
return f.Data, nil
} }
func (r *rpcRequest) Stream() bool { func (r *rpcRequest) Stream() bool {
@ -77,7 +83,7 @@ func (r *rpcMessage) Topic() string {
return r.topic return r.topic
} }
func (r *rpcMessage) Body() interface{} { func (r *rpcMessage) Payload() interface{} {
return r.payload return r.payload
} }
@ -85,6 +91,10 @@ func (r *rpcMessage) Header() metadata.Metadata {
return r.header return r.header
} }
func (r *rpcMessage) Body() []byte {
return r.body
}
func (r *rpcMessage) Codec() codec.Codec { func (r *rpcMessage) Codec() codec.Codec {
return r.codec return r.codec
} }

View File

@ -1,14 +1,15 @@
package http package http
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"reflect" "reflect"
"strings" "strings"
"go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v3/server"
) )
@ -100,7 +101,7 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
} }
} }
func (s *Server) createSubHandler(sb *httpSubscriber, opts server.Options) broker.Handler { func (s *httpServer) createSubHandler(sb *httpSubscriber, opts server.Options) broker.Handler {
return func(p broker.Event) error { return func(p broker.Event) error {
msg := p.Message() msg := p.Message()
ct := msg.Header["Content-Type"] ct := msg.Header["Content-Type"]
@ -110,6 +111,7 @@ func (s *Server) createSubHandler(sb *httpSubscriber, opts server.Options) broke
} }
hdr := metadata.Copy(msg.Header) hdr := metadata.Copy(msg.Header)
delete(hdr, "Content-Type")
ctx := metadata.NewIncomingContext(context.Background(), hdr) ctx := metadata.NewIncomingContext(context.Background(), hdr)
results := make(chan error, len(sb.handlers)) results := make(chan error, len(sb.handlers))
@ -130,7 +132,13 @@ func (s *Server) createSubHandler(sb *httpSubscriber, opts server.Options) broke
req = req.Elem() req = req.Elem()
} }
if err := cf.Unmarshal(msg.Body, req.Interface()); err != nil { buf := bytes.NewBuffer(msg.Body)
if err := cf.ReadHeader(buf, &codec.Message{}, codec.Event); err != nil {
return err
}
if err := cf.ReadBody(buf, req.Interface()); err != nil {
return err return err
} }
@ -143,7 +151,7 @@ func (s *Server) createSubHandler(sb *httpSubscriber, opts server.Options) broke
vals = append(vals, reflect.ValueOf(ctx)) vals = append(vals, reflect.ValueOf(ctx))
} }
vals = append(vals, reflect.ValueOf(msg.Body())) vals = append(vals, reflect.ValueOf(msg.Payload()))
returnValues := handler.method.Call(vals) returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil { if err := returnValues[0].Interface(); err != nil {
@ -152,11 +160,9 @@ func (s *Server) createSubHandler(sb *httpSubscriber, opts server.Options) broke
return nil return nil
} }
opts.Hooks.EachNext(func(hook options.Hook) { for i := len(opts.SubWrappers); i > 0; i-- {
if h, ok := hook.(server.HookSubHandler); ok { fn = opts.SubWrappers[i-1](fn)
fn = h(fn) }
}
})
go func() { go func() {
results <- fn(ctx, &httpMessage{ results <- fn(ctx, &httpMessage{
@ -164,6 +170,7 @@ func (s *Server) createSubHandler(sb *httpSubscriber, opts server.Options) broke
contentType: ct, contentType: ct,
payload: req.Interface(), payload: req.Interface(),
header: msg.Header, header: msg.Header,
body: msg.Body,
codec: cf, codec: cf,
}) })
}() }()

View File

@ -8,52 +8,8 @@ import (
"testing" "testing"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/server"
) )
func Test_Hook(t *testing.T) {
opts := server.Options{}
var fn server.HandlerFunc = func(fctx context.Context, req server.Request, rsp interface{}) (err error) {
// fmt.Println("1")
return nil
}
var fn2 server.HandlerWrapper = func(next server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
// fmt.Println("2")
return next(ctx, req, rsp)
}
}
var fn3 server.HandlerWrapper = func(next server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
// fmt.Println("3")
return next(ctx, req, rsp)
}
}
var fn4 server.HandlerWrapper = func(next server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
// fmt.Println("4")
return next(ctx, req, rsp)
}
}
opts.Hooks = append(opts.Hooks, fn2, fn3, fn4)
opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(server.HandlerWrapper); ok {
// fmt.Printf("h %#+v\n", h)
fn = h(fn)
}
})
err := fn(nil, nil, nil)
if err != nil {
t.Fatal(err)
}
}
func TestFillrequest(t *testing.T) { func TestFillrequest(t *testing.T) {
md := metadata.New(1) md := metadata.New(1)
md.Set("ClientID", "xxx") md.Set("ClientID", "xxx")