Compare commits

..

21 Commits
v3 ... tls

Author SHA1 Message Date
32e7875fab add tls headers
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-08-03 10:44:26 +03:00
93a0af7cde add tls headers
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-08-03 10:40:02 +03:00
4c3d3058f6 handler/swagger: initial import
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-06-18 20:49:29 +03:00
bcd27b833a Merge pull request 'fix query param struct filling' (#168) from matchesfix into master
Reviewed-on: #168
2023-05-29 12:29:39 +03:00
090b5e3c07 fix query param struct filling
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-29 12:29:04 +03:00
95109c9dc2 Merge pull request 'add scheme to metadata' (#166) from scheme into master
Reviewed-on: #166
2023-05-19 23:28:22 +03:00
69dcf71d3f add scheme to metadata
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-19 23:24:53 +03:00
0a0a986a70 Merge pull request 'move down path handler after specific handler' (#164) from path-handler into master
Reviewed-on: #164
2023-05-19 23:04:01 +03:00
76fe748e4a move down path handler after specific handler
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-19 23:02:42 +03:00
9926d52f78 Merge pull request 'fix build' (#161) from fixup into master
Reviewed-on: #161
2023-05-09 18:48:11 +03:00
2ed04e3e24 fix build
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-09 18:47:56 +03:00
4a9cc0f03f Merge pull request 'cleanup message stuf from server' (#160) from cleanup into master
Reviewed-on: #160
2023-05-09 18:39:08 +03:00
99af727138 cleanup message stuf from server
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-09 18:38:49 +03:00
89fe1dd6bc Merge pull request 'export Server to allow to cast' (#159) from exportServer into master
Reviewed-on: #159
2023-05-09 18:34:39 +03:00
8b591f7fd4 export Server to allow to cast
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-09 18:34:24 +03:00
bec1705d09 Merge pull request 'allow to expose some method via http.HandlerFunc' (#158) from httphandler into master
Reviewed-on: #158
2023-05-08 22:23:58 +03:00
316f644090 allow to expose some method via http.HandlerFunc
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-08 22:23:34 +03:00
3fdff4312c Merge pull request 'issue-155: add swagger-ui handler' (#156) from issue-155 into master
Reviewed-on: #156
2023-05-04 02:20:49 +03:00
09657b4b67 issue-155: add swagger-ui handler
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-04 02:20:25 +03:00
741a6f181b Merge pull request 'move to micro v4' (#154) from v4 into master
Reviewed-on: #154
2023-04-28 22:00:02 +03:00
7f971ee6c3 move to micro v4
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-28 21:59:31 +03:00
44 changed files with 556 additions and 2842 deletions

View File

@ -1,29 +0,0 @@
name: lint
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup deps
run: go get -v ./...
- name: run lint
uses: https://github.com/golangci/golangci-lint-action@v6
with:
version: 'latest'

View File

@ -1,34 +0,0 @@
name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
push:
branches:
- master
- v3
- v4
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup deps
run: go get -v ./...
- name: run test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...

View File

@ -1,53 +0,0 @@
name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
push:
branches:
- master
- v3
- v4
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: checkout tests
uses: actions/checkout@v4
with:
ref: master
filter: 'blob:none'
repository: unistack-org/micro-tests
path: micro-tests
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup go work
env:
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: |
go work init
go work use .
go work use micro-tests
- name: setup deps
env:
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: go get -v ./...
- name: run tests
env:
INTEGRATION_TESTS: yes
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: |
cd micro-tests
go test -mod readonly -v ./... || true

19
.github/dependabot.yml vendored Normal file
View File

@ -0,0 +1,19 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
version: 2
updates:
# Maintain dependencies for GitHub Actions
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
# Maintain dependencies for Golang
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "daily"

20
.github/workflows/autoapprove.yml vendored Normal file
View File

@ -0,0 +1,20 @@
name: "autoapprove"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
autoapprove:
runs-on: ubuntu-latest
steps:
- name: approve
uses: hmarr/auto-approve-action@v3
if: github.actor == 'vtolstov' || github.actor == 'dependabot[bot]'
id: approve
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

21
.github/workflows/automerge.yml vendored Normal file
View File

@ -0,0 +1,21 @@
name: "automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'vtolstov'
steps:
- name: merge
id: merge
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

47
.github/workflows/build.yml vendored Normal file
View File

@ -0,0 +1,47 @@
name: build
on:
push:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
- name: checkout
uses: actions/checkout@v3
- name: cache
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-
- name: deps
run: go get -v -t -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...
lint:
name: lint
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
- name: lint
uses: golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30
# Optional: working directory, useful for monorepos
# working-directory: somedir
# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0
# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true

78
.github/workflows/codeql-analysis.yml vendored Normal file
View File

@ -0,0 +1,78 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "codeql"
on:
workflow_run:
workflows: ["prbuild"]
types:
- completed
push:
branches: [ master, v3 ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master, v3 ]
schedule:
- cron: '34 1 * * 0'
jobs:
analyze:
name: analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: checkout
uses: actions/checkout@v3
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
# Initializes the CodeQL tools for scanning.
- name: init
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: autobuild
uses: github/codeql-action/autobuild@v2
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: analyze
uses: github/codeql-action/analyze@v2

View File

@ -0,0 +1,27 @@
name: "dependabot-automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'dependabot[bot]'
steps:
- name: metadata
id: metadata
uses: dependabot/fetch-metadata@v1.3.6
with:
github-token: "${{ secrets.TOKEN }}"
- name: merge
id: merge
if: ${{contains(steps.metadata.outputs.dependency-names, 'go.unistack.org')}}
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

47
.github/workflows/pr.yml vendored Normal file
View File

@ -0,0 +1,47 @@
name: prbuild
on:
pull_request:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
- name: checkout
uses: actions/checkout@v3
- name: cache
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-
- name: deps
run: go get -v -t -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...
lint:
name: lint
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
- name: lint
uses: golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30
# Optional: working directory, useful for monorepos
# working-directory: somedir
# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0
# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true

24
.gitignore vendored
View File

@ -1,24 +0,0 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
bin
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
# General
.DS_Store
.idea
.vscode

View File

@ -1,5 +1,44 @@
run:
concurrency: 8
concurrency: 4
deadline: 5m
issues-exit-code: 1
tests: true
linters-settings:
govet:
check-shadowing: true
enable:
- fieldalignment
linters:
enable:
- govet
- deadcode
- errcheck
- govet
- ineffassign
- staticcheck
- structcheck
- typecheck
- unused
- varcheck
- bodyclose
- gci
- goconst
- gocritic
- gosimple
- gofmt
- gofumpt
- goimports
- golint
- gosec
- makezero
- misspell
- nakedret
- nestif
- nilerr
- noctx
- prealloc
- unconvert
- unparam
disable-all: false

View File

@ -9,8 +9,8 @@ to create a HTTP Server that could potentially be used for REST based API servic
import (
"net/http"
"github.com/unistack-org/micro/v3/server"
httpServer "github.com/unistack-org/micro-server-http"
"go.unistack.org/micro/v4/server"
httpServer "go.unistack.org/micro-server-http/v4"
)
func main() {
@ -37,9 +37,9 @@ Or as part of a service
import (
"net/http"
"github.com/unistack-org/micro/v3"
"github.com/unistack-org/micro/v3/server"
httpServer "github.com/unistack-org/micro-server-http"
"go.unistack.org/micro/v4"
"go.unistack.org/micro/v4/server"
httpServer "go.unistack.org/micro-server-http/v4"
)
func main() {

26
go.mod
View File

@ -1,23 +1,19 @@
module go.unistack.org/micro-server-http/v3
module go.unistack.org/micro-server-http/v4
go 1.22.7
toolchain go1.23.3
go 1.19
require (
go.unistack.org/micro-client-http/v3 v3.9.14
go.unistack.org/micro-codec-yaml/v3 v3.10.2
go.unistack.org/micro-proto/v3 v3.4.1
go.unistack.org/micro/v3 v3.10.108
golang.org/x/net v0.31.0
go.unistack.org/micro-proto/v4 v4.0.0
go.unistack.org/micro/v4 v4.0.1
golang.org/x/net v0.10.0
)
require (
github.com/google/gnostic v0.7.0 // indirect
github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect
golang.org/x/sys v0.27.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 // indirect
google.golang.org/grpc v1.68.0 // indirect
google.golang.org/protobuf v1.35.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic v0.6.9 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/kr/text v0.2.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

1466
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -6,30 +6,23 @@ import (
"io"
"net/http"
"reflect"
"slices"
"strconv"
"strings"
"sync"
"time"
"go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v3/tracer"
rhttp "go.unistack.org/micro/v3/util/http"
rflutil "go.unistack.org/micro/v3/util/reflect"
"go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v4/server"
rhttp "go.unistack.org/micro/v4/util/http"
rflutil "go.unistack.org/micro/v4/util/reflect"
)
var (
DefaultErrorHandler = func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int) {
w.WriteHeader(status)
if _, cerr := w.Write([]byte(err.Error())); cerr != nil {
logger.DefaultLogger.Error(ctx, "write error", cerr)
logger.DefaultLogger.Errorf(ctx, "write failed: %v", cerr)
}
}
DefaultContentType = "application/json"
@ -122,16 +115,15 @@ func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error)
md["Method"] = r.Method
md["URL"] = r.URL.String()
md["Proto"] = r.Proto
md["Content-Length"] = fmt.Sprintf("%d", r.ContentLength)
md["Transfer-Encoding"] = strings.Join(r.TransferEncoding, ",")
md["ContentLength"] = fmt.Sprintf("%d", r.ContentLength)
md["TransferEncoding"] = strings.Join(r.TransferEncoding, ",")
md["Host"] = r.Host
md["RequestURI"] = r.RequestURI
if r.TLS != nil {
md["TLS"] = "true"
md["TLS-ALPN"] = r.TLS.NegotiatedProtocol
md["TLS-ServerName"] = r.TLS.ServerName
md["TLS_ALPN"] = r.TLS.NegotiatedProtocol
md["TLS_ServerName"] = r.TLS.ServerName
}
ctx = metadata.NewIncomingContext(ctx, md)
path := r.URL.Path
@ -199,7 +191,8 @@ func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error)
cf, err := h.newCodec(ct)
if err != nil {
w.WriteHeader(http.StatusUnsupportedMediaType)
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
@ -278,11 +271,9 @@ func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error)
}
// wrap the handler func
h.opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(server.HookHandler); ok {
fn = h(fn)
for i := len(handler.sopts.HdlrWrappers); i > 0; i-- {
fn = handler.sopts.HdlrWrappers[i-1](fn)
}
})
if ct == "application/x-www-form-urlencoded" {
cf, err = h.newCodec(DefaultContentType)
@ -311,7 +302,7 @@ func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error)
}
if nct := w.Header().Get(metadata.HeaderContentType); nct != ct {
if cf, err = h.newCodec(nct); err != nil {
h.errorHandler(ctx, nil, w, r, err, http.StatusInternalServerError)
h.errorHandler(ctx, nil, w, r, err, http.StatusBadRequest)
return
}
}
@ -332,7 +323,7 @@ func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error)
}
if err != nil && handler.sopts.Logger.V(logger.ErrorLevel) {
handler.sopts.Logger.Error(handler.sopts.Context, "handler error", err)
handler.sopts.Logger.Errorf(handler.sopts.Context, "handler err: %v", err)
return
}
@ -342,7 +333,7 @@ func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error)
w.WriteHeader(scode)
if _, cerr := w.Write(buf); cerr != nil {
handler.sopts.Logger.Error(ctx, "write failed", cerr)
handler.sopts.Logger.Errorf(ctx, "write failed: %v", cerr)
}
}, nil
}
@ -353,11 +344,8 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ct = htype
}
ts := time.Now()
ctx := context.WithValue(r.Context(), rspCodeKey{}, &rspCodeVal{})
ctx = context.WithValue(ctx, rspHeaderKey{}, &rspHeaderVal{})
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(len(r.Header) + 8)
@ -381,7 +369,6 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
md["Host"] = r.Host
md["RequestURI"] = r.RequestURI
ctx = metadata.NewIncomingContext(ctx, md)
ctx = metadata.NewOutgoingContext(ctx, metadata.New(0))
path := r.URL.Path
if !strings.HasPrefix(path, "/") {
@ -433,116 +420,21 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
var sp tracer.Span
if !match && h.hd != nil {
if hdlr, ok := h.hd.Handler().(http.Handler); ok {
endpointName := fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name)
if !slices.Contains(tracer.DefaultSkipEndpoints, endpointName) {
ctx, sp = h.opts.Tracer.Start(ctx, "rpc-server",
tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels(
"endpoint", endpointName,
),
)
defer func() {
n := GetRspCode(ctx)
if s, _ := sp.Status(); s != tracer.SpanStatusError && n > 399 {
sp.SetStatus(tracer.SpanStatusError, http.StatusText(n))
}
sp.Finish()
}()
}
if !slices.Contains(meter.DefaultSkipEndpoints, endpointName) {
h.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", endpointName, "server", "http").Inc()
defer func() {
n := GetRspCode(ctx)
if n > 399 {
h.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", endpointName, "server", "http", "status", "success", "code", strconv.Itoa(n)).Inc()
} else {
h.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", endpointName, "server", "http", "status", "failure", "code", strconv.Itoa(n)).Inc()
}
te := time.Since(ts)
h.opts.Meter.Summary(semconv.ServerRequestLatencyMicroseconds, "endpoint", endpointName, "server", "http").Update(te.Seconds())
h.opts.Meter.Histogram(semconv.ServerRequestDurationSeconds, "endpoint", endpointName, "server", "http").Update(te.Seconds())
h.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", endpointName, "server", "http").Dec()
}()
}
hdlr.ServeHTTP(w, r.WithContext(ctx))
hdlr.ServeHTTP(w, r)
return
}
} else if !match {
// check for http.HandlerFunc handlers
if !slices.Contains(tracer.DefaultSkipEndpoints, r.URL.Path) {
ctx, sp = h.opts.Tracer.Start(ctx, "rpc-server",
tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels(
"endpoint", r.URL.Path,
"server", "http",
),
)
defer func() {
if n := GetRspCode(ctx); n > 399 {
sp.SetStatus(tracer.SpanStatusError, http.StatusText(n))
} else {
sp.SetStatus(tracer.SpanStatusError, http.StatusText(http.StatusNotFound))
}
sp.Finish()
}()
}
if ph, _, err := h.pathHandlers.Search(r.Method, r.URL.Path); err == nil {
ph.(http.HandlerFunc)(w, r.WithContext(ctx))
ph.(http.HandlerFunc)(w, r)
return
}
h.errorHandler(ctx, nil, w, r, fmt.Errorf("not matching route found"), http.StatusNotFound)
return
}
endpointName := fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name)
topts := []tracer.SpanOption{
tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels(
"endpoint", endpointName,
"server", "http",
),
}
if slices.Contains(tracer.DefaultSkipEndpoints, endpointName) {
topts = append(topts, tracer.WithSpanRecord(false))
}
ctx, sp = h.opts.Tracer.Start(ctx, "rpc-server", topts...)
if !slices.Contains(meter.DefaultSkipEndpoints, handler.name) {
defer func() {
te := time.Since(ts)
h.opts.Meter.Summary(semconv.ServerRequestLatencyMicroseconds, "endpoint", handler.name, "server", "http").Update(te.Seconds())
h.opts.Meter.Histogram(semconv.ServerRequestDurationSeconds, "endpoint", handler.name, "server", "http").Update(te.Seconds())
h.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", handler.name, "server", "http").Dec()
n := GetRspCode(ctx)
if n > 399 {
h.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", handler.name, "server", "http", "status", "failure", "code", strconv.Itoa(n)).Inc()
} else {
h.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", handler.name, "server", "http", "status", "success", "code", strconv.Itoa(n)).Inc()
}
}()
}
defer func() {
n := GetRspCode(ctx)
if n > 399 {
if s, _ := sp.Status(); s != tracer.SpanStatusError {
sp.SetStatus(tracer.SpanStatusError, http.StatusText(n))
}
}
sp.Finish()
}()
// get fields from url values
if len(r.URL.RawQuery) > 0 {
umd, cerr := rflutil.URLMap(r.URL.RawQuery)
@ -638,18 +530,13 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
metadata.SetOutgoingContext(ctx, md)
if err != nil && sp != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
return err
}
h.opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(server.HookHandler); ok {
fn = h(fn)
// wrap the handler func
for i := len(handler.sopts.HdlrWrappers); i > 0; i-- {
fn = handler.sopts.HdlrWrappers[i-1](fn)
}
})
if ct == "application/x-www-form-urlencoded" {
cf, err = h.newCodec(DefaultContentType)
@ -678,7 +565,7 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
if nct := w.Header().Get(metadata.HeaderContentType); nct != ct {
if cf, err = h.newCodec(nct); err != nil {
h.errorHandler(ctx, nil, w, r, err, http.StatusInternalServerError)
h.errorHandler(ctx, nil, w, r, err, http.StatusBadRequest)
return
}
}
@ -698,18 +585,17 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
buf, err = cf.Marshal(replyv.Interface())
}
if err != nil {
if handler.sopts.Logger.V(logger.ErrorLevel) {
handler.sopts.Logger.Error(handler.sopts.Context, "handler error", err)
}
scode = http.StatusInternalServerError
} else if nscode := GetRspCode(ctx); nscode != 0 {
scode = nscode
if err != nil && handler.sopts.Logger.V(logger.ErrorLevel) {
handler.sopts.Logger.Errorf(handler.sopts.Context, "handler err: %v", err)
return
}
if nscode := GetRspCode(ctx); nscode != 0 {
scode = nscode
}
w.WriteHeader(scode)
if _, cerr := w.Write(buf); cerr != nil {
handler.sopts.Logger.Error(ctx, "respoonse write error", cerr)
handler.sopts.Logger.Errorf(ctx, "write failed: %v", cerr)
}
}

View File

@ -2,11 +2,11 @@ package handler
import (
// import required packages
_ "go.unistack.org/micro-proto/v3/openapiv3"
_ "go.unistack.org/micro-proto/v4/openapiv3"
)
//go:generate sh -c "curl -L https://github.com/swagger-api/swagger-ui/archive/refs/tags/v4.18.3.zip -o - | bsdtar -C swagger-ui --strip-components=2 -xv swagger-ui-4.18.3/dist && rm swagger-ui/*.map swagger-ui/*-es-*.js swagger-ui/swagger-ui.js swagger-ui/swagger-initializer.js"
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v3) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ ./meter/meter.proto"
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v4) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ ./meter/meter.proto"
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v3) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ ./health/health.proto"
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v4) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ ./health/health.proto"

View File

@ -1,77 +0,0 @@
//go:build ignore
package graphql_handler
import (
"context"
"fmt"
"github.com/99designs/gqlgen/graphql"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/store"
)
var _ graphql.Cache = (*cacheWrapper)(nil)
type Handler struct {
opts Options
}
type Option func(*Options)
type Options struct {
cache *cacheWrapper
Path string
}
type cacheWrapper struct {
s store.Store
l logger.Logger
}
func (c *cacheWrapper) Get(ctx context.Context, key string) (interface{}, bool) {
var val interface{}
if err := c.s.Read(ctx, key, val); err != nil && err != store.ErrNotFound {
c.l.Error(ctx, fmt.Sprintf("cache.Get %s failed", key), err)
return nil, false
}
return val, true
}
func (c *cacheWrapper) Add(ctx context.Context, key string, val interface{}) {
if err := c.s.Write(ctx, key, val); err != nil {
c.l.Error(ctx, fmt.Sprintf("cache.Add %s failed", key), err)
}
}
func Store(s store.Store) Option {
return func(o *Options) {
if o.cache == nil {
o.cache = &cacheWrapper{}
}
o.cache.s = s
}
}
func Logger(l logger.Logger) Option {
return func(o *Options) {
if o.cache == nil {
o.cache = &cacheWrapper{}
}
o.cache.l = l
}
}
func Path(path string) Option {
return func(o *Options) {
o.Path = path
}
}
func NewHandler(opts ...Option) *Handler {
options := Options{}
for _, o := range opts {
o(&options)
}
return &Handler{opts: options}
}

View File

@ -1,42 +1,27 @@
package health_handler
package health // import "go.unistack.org/micro-server-http/v4/handler/health"
import (
"context"
codecpb "go.unistack.org/micro-proto/v3/codec"
"go.unistack.org/micro/v3/errors"
codecpb "go.unistack.org/micro-proto/v4/codec"
"go.unistack.org/micro/v4/errors"
)
var _ HealthServiceServer = &Handler{}
var _ HealthServiceServer = (*Handler)(nil)
type Handler struct {
opts Options
}
type (
CheckFunc func(context.Context) error
Option func(*Options)
)
type CheckFunc func(context.Context) error
type Stater interface {
Live() bool
Ready() bool
Health() bool
}
type Option func(*Options)
type Options struct {
Version string
Name string
Staters []Stater
LiveChecks []CheckFunc
ReadyChecks []CheckFunc
HealthChecks []CheckFunc
}
func Service(s ...Stater) Option {
return func(o *Options) {
o.Staters = append(o.Staters, s...)
}
}
func LiveChecks(fns ...CheckFunc) Option {
@ -51,12 +36,6 @@ func ReadyChecks(fns ...CheckFunc) Option {
}
}
func HealthChecks(fns ...CheckFunc) Option {
return func(o *Options) {
o.HealthChecks = append(o.HealthChecks, fns...)
}
}
func Name(name string) Option {
return func(o *Options) {
o.Name = name
@ -77,51 +56,18 @@ func NewHandler(opts ...Option) *Handler {
return &Handler{opts: options}
}
func (h *Handler) Healthy(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
var err error
for _, s := range h.opts.Staters {
if !s.Health() {
return errors.ServiceUnavailable(h.opts.Name, "%v", err)
}
}
for _, fn := range h.opts.HealthChecks {
if err = fn(ctx); err != nil {
return errors.ServiceUnavailable(h.opts.Name, "%v", err)
}
}
return nil
}
func (h *Handler) Live(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
var err error
for _, s := range h.opts.Staters {
if !s.Live() {
return errors.ServiceUnavailable(h.opts.Name, "%v", err)
}
}
for _, fn := range h.opts.LiveChecks {
if err = fn(ctx); err != nil {
return errors.ServiceUnavailable(h.opts.Name, "%v", err)
}
}
return nil
}
func (h *Handler) Ready(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
var err error
for _, s := range h.opts.Staters {
if !s.Ready() {
return errors.ServiceUnavailable(h.opts.Name, "%v", err)
}
}
for _, fn := range h.opts.ReadyChecks {
if err = fn(ctx); err != nil {
return errors.ServiceUnavailable(h.opts.Name, "%v", err)

View File

@ -1,29 +1,13 @@
syntax = "proto3";
package micro.server.http.v3.handler.health;
option go_package = "go.unistack.org/micro-server-http/v3/handler/health;health_handler";
package micro.server.http.v4.handler.health;
option go_package = "go.unistack.org/micro-server-http/v4/handler/health;health";
import "api/annotations.proto";
import "openapiv3/annotations.proto";
import "codec/frame.proto";
service HealthService {
rpc Healthy(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv3.openapiv3_operation) = {
operation_id: "Healthy";
responses: {
default: {
reference: {
_ref: "micro.codec.Frame";
};
};
};
};
option (micro.api.http) = {
get: "/health";
additional_bindings: { get: "/healthz"; }
};
};
rpc Live(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv3.openapiv3_operation) = {
operation_id: "Live";
@ -35,10 +19,7 @@ service HealthService {
};
};
};
option (micro.api.http) = {
get: "/live";
additional_bindings: { get: "/livez"; }
};
option (micro.api.http) = { get: "/live"; };
};
rpc Ready(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv3.openapiv3_operation) = {
@ -51,9 +32,7 @@ service HealthService {
};
};
};
option (micro.api.http) = { get: "/ready";
additional_bindings: { get: "/readyz"; }
};
option (micro.api.http) = { get: "/ready"; };
};
rpc Version(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv3.openapiv3_operation) = {

View File

@ -1,30 +1,47 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// versions:
// - protoc-gen-go-micro v3.10.4
// - protoc v5.28.3
// - protoc-gen-go-micro v4.0.0
// - protoc v3.21.12
// source: health/health.proto
package health_handler
package health
import (
context "context"
codec "go.unistack.org/micro-proto/v3/codec"
client "go.unistack.org/micro/v3/client"
codec "go.unistack.org/micro-proto/v4/codec"
v4 "go.unistack.org/micro-server-http/v4"
)
var (
HealthServiceName = "HealthService"
)
type HealthServiceClient interface {
Healthy(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error)
Live(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error)
Ready(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error)
Version(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error)
}
var (
HealthServiceServerEndpoints = []v4.EndpointMetadata{
{
Name: "HealthService.Live",
Path: "/live",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Ready",
Path: "/ready",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Version",
Path: "/version",
Method: "GET",
Body: "",
Stream: false,
},
}
)
type HealthServiceServer interface {
Healthy(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Version(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error

View File

@ -1,162 +1,20 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// protoc-gen-go-micro version: v3.10.4
// protoc-gen-go-micro version: v4.0.0
// source: health/health.proto
package health_handler
package health
import (
context "context"
v31 "go.unistack.org/micro-client-http/v3"
codec "go.unistack.org/micro-proto/v3/codec"
v3 "go.unistack.org/micro-server-http/v3"
client "go.unistack.org/micro/v3/client"
server "go.unistack.org/micro/v3/server"
http "net/http"
codec "go.unistack.org/micro-proto/v4/codec"
v4 "go.unistack.org/micro-server-http/v4"
server "go.unistack.org/micro/v4/server"
)
var (
HealthServiceServerEndpoints = []v3.EndpointMetadata{
{
Name: "HealthService.Healthy",
Path: "/health",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Healthy",
Path: "/healthz",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Live",
Path: "/live",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Live",
Path: "/livez",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Ready",
Path: "/ready",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Ready",
Path: "/readyz",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Version",
Path: "/version",
Method: "GET",
Body: "",
Stream: false,
},
}
)
type healthServiceClient struct {
c client.Client
name string
}
func NewHealthServiceClient(name string, c client.Client) HealthServiceClient {
return &healthServiceClient{c: c, name: name}
}
func (c *healthServiceClient) Healthy(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error) {
errmap := make(map[string]interface{}, 1)
errmap["default"] = &codec.Frame{}
opts = append(opts,
v31.ErrorMap(errmap),
)
opts = append(opts,
v31.Method(http.MethodGet),
v31.Path("/health"),
)
rsp := &codec.Frame{}
err := c.c.Call(ctx, c.c.NewRequest(c.name, "HealthService.Healthy", req), rsp, opts...)
if err != nil {
return nil, err
}
return rsp, nil
}
func (c *healthServiceClient) Live(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error) {
errmap := make(map[string]interface{}, 1)
errmap["default"] = &codec.Frame{}
opts = append(opts,
v31.ErrorMap(errmap),
)
opts = append(opts,
v31.Method(http.MethodGet),
v31.Path("/live"),
)
rsp := &codec.Frame{}
err := c.c.Call(ctx, c.c.NewRequest(c.name, "HealthService.Live", req), rsp, opts...)
if err != nil {
return nil, err
}
return rsp, nil
}
func (c *healthServiceClient) Ready(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error) {
errmap := make(map[string]interface{}, 1)
errmap["default"] = &codec.Frame{}
opts = append(opts,
v31.ErrorMap(errmap),
)
opts = append(opts,
v31.Method(http.MethodGet),
v31.Path("/ready"),
)
rsp := &codec.Frame{}
err := c.c.Call(ctx, c.c.NewRequest(c.name, "HealthService.Ready", req), rsp, opts...)
if err != nil {
return nil, err
}
return rsp, nil
}
func (c *healthServiceClient) Version(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error) {
errmap := make(map[string]interface{}, 1)
errmap["default"] = &codec.Frame{}
opts = append(opts,
v31.ErrorMap(errmap),
)
opts = append(opts,
v31.Method(http.MethodGet),
v31.Path("/version"),
)
rsp := &codec.Frame{}
err := c.c.Call(ctx, c.c.NewRequest(c.name, "HealthService.Version", req), rsp, opts...)
if err != nil {
return nil, err
}
return rsp, nil
}
type healthServiceServer struct {
HealthServiceServer
}
func (h *healthServiceServer) Healthy(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
return h.HealthServiceServer.Healthy(ctx, req, rsp)
}
func (h *healthServiceServer) Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
return h.HealthServiceServer.Live(ctx, req, rsp)
}
@ -171,7 +29,6 @@ func (h *healthServiceServer) Version(ctx context.Context, req *codec.Frame, rsp
func RegisterHealthServiceServer(s server.Server, sh HealthServiceServer, opts ...server.HandlerOption) error {
type healthService interface {
Healthy(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Version(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
@ -181,6 +38,6 @@ func RegisterHealthServiceServer(s server.Server, sh HealthServiceServer, opts .
}
h := &healthServiceServer{sh}
var nopts []server.HandlerOption
nopts = append(nopts, v3.HandlerEndpoints(HealthServiceServerEndpoints))
nopts = append(nopts, v4.HandlerEndpoints(HealthServiceServerEndpoints))
return s.Handle(s.NewHandler(&HealthService{h}, append(nopts, opts...)...))
}

View File

@ -1,41 +1,19 @@
package meter_handler
package meter // import "go.unistack.org/micro-server-http/v4/handler/meter"
import (
"bytes"
"compress/gzip"
"context"
"io"
"strings"
"sync"
codecpb "go.unistack.org/micro-proto/v3/codec"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/meter"
codecpb "go.unistack.org/micro-proto/v4/codec"
"go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/meter"
)
const (
contentEncodingHeader = "Content-Encoding"
acceptEncodingHeader = "Accept-Encoding"
)
var gzipPool = sync.Pool{
New: func() interface{} {
return gzip.NewWriter(nil)
},
}
var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
}
// guard to fail early
var _ MeterServiceServer = &Handler{}
var _ MeterServiceServer = (*Handler)(nil)
type Handler struct {
Options Options
opts Options
}
type Option func(*Options)
@ -44,7 +22,6 @@ type Options struct {
Meter meter.Meter
Name string
MeterOptions []meter.Option
DisableCompress bool
}
func Meter(m meter.Meter) Option {
@ -59,12 +36,6 @@ func Name(name string) Option {
}
}
func DisableCompress(g bool) Option {
return func(o *Options) {
o.DisableCompress = g
}
}
func MeterOptions(opts ...meter.Option) Option {
return func(o *Options) {
o.MeterOptions = append(o.MeterOptions, opts...)
@ -72,7 +43,7 @@ func MeterOptions(opts ...meter.Option) Option {
}
func NewOptions(opts ...Option) Options {
options := Options{Meter: meter.DefaultMeter, DisableCompress: false}
options := Options{Meter: meter.DefaultMeter}
for _, o := range opts {
o(&options)
}
@ -81,52 +52,16 @@ func NewOptions(opts ...Option) Options {
func NewHandler(opts ...Option) *Handler {
options := NewOptions(opts...)
return &Handler{Options: options}
return &Handler{opts: options}
}
func (h *Handler) Metrics(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
log, ok := logger.FromContext(ctx)
if !ok {
log = logger.DefaultLogger
}
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
w := io.Writer(buf)
if md, ok := metadata.FromOutgoingContext(ctx); gzipAccepted(md) && ok && !h.Options.DisableCompress {
omd, _ := metadata.FromOutgoingContext(ctx)
omd.Set(contentEncodingHeader, "gzip")
gz := gzipPool.Get().(*gzip.Writer)
defer gzipPool.Put(gz)
gz.Reset(w)
defer gz.Close()
w = gz
gz.Flush()
}
if err := h.Options.Meter.Write(w, h.Options.MeterOptions...); err != nil {
log.Error(ctx, "http/meter write failed", err)
return nil
buf := bytes.NewBuffer(nil)
if err := h.opts.Meter.Write(buf, h.opts.MeterOptions...); err != nil {
return errors.InternalServerError(h.opts.Name, "%v", err)
}
rsp.Data = buf.Bytes()
return nil
}
// gzipAccepted returns whether the client will accept gzip-encoded content.
func gzipAccepted(md metadata.Metadata) bool {
a, ok := md.Get(acceptEncodingHeader)
if !ok {
return false
}
if strings.Contains(a, "gzip") {
return true
}
return false
}

View File

@ -1,7 +1,7 @@
syntax = "proto3";
package micro.server.http.v3.handler.meter;
option go_package = "go.unistack.org/micro-server-http/v3/handler/meter;meter_handler";
package micro.server.http.v4.handler.meter;
option go_package = "go.unistack.org/micro-server-http/v4/handler/meter;meter";
import "api/annotations.proto";
import "openapiv3/annotations.proto";

View File

@ -1,24 +1,31 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// versions:
// - protoc-gen-go-micro v3.10.4
// - protoc v5.28.3
// - protoc-gen-go-micro v4.0.0
// - protoc v3.21.12
// source: meter/meter.proto
package meter_handler
package meter
import (
context "context"
codec "go.unistack.org/micro-proto/v3/codec"
client "go.unistack.org/micro/v3/client"
codec "go.unistack.org/micro-proto/v4/codec"
v4 "go.unistack.org/micro-server-http/v4"
)
var (
MeterServiceName = "MeterService"
)
type MeterServiceClient interface {
Metrics(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error)
}
var (
MeterServiceServerEndpoints = []v4.EndpointMetadata{
{
Name: "MeterService.Metrics",
Path: "/metrics",
Method: "GET",
Body: "",
Stream: false,
},
}
)
type MeterServiceServer interface {
Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error

View File

@ -1,58 +1,16 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// protoc-gen-go-micro version: v3.10.4
// protoc-gen-go-micro version: v4.0.0
// source: meter/meter.proto
package meter_handler
package meter
import (
context "context"
v31 "go.unistack.org/micro-client-http/v3"
codec "go.unistack.org/micro-proto/v3/codec"
v3 "go.unistack.org/micro-server-http/v3"
client "go.unistack.org/micro/v3/client"
server "go.unistack.org/micro/v3/server"
http "net/http"
codec "go.unistack.org/micro-proto/v4/codec"
v4 "go.unistack.org/micro-server-http/v4"
server "go.unistack.org/micro/v4/server"
)
var (
MeterServiceServerEndpoints = []v3.EndpointMetadata{
{
Name: "MeterService.Metrics",
Path: "/metrics",
Method: "GET",
Body: "",
Stream: false,
},
}
)
type meterServiceClient struct {
c client.Client
name string
}
func NewMeterServiceClient(name string, c client.Client) MeterServiceClient {
return &meterServiceClient{c: c, name: name}
}
func (c *meterServiceClient) Metrics(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error) {
errmap := make(map[string]interface{}, 1)
errmap["default"] = &codec.Frame{}
opts = append(opts,
v31.ErrorMap(errmap),
)
opts = append(opts,
v31.Method(http.MethodGet),
v31.Path("/metrics"),
)
rsp := &codec.Frame{}
err := c.c.Call(ctx, c.c.NewRequest(c.name, "MeterService.Metrics", req), rsp, opts...)
if err != nil {
return nil, err
}
return rsp, nil
}
type meterServiceServer struct {
MeterServiceServer
}
@ -70,6 +28,6 @@ func RegisterMeterServiceServer(s server.Server, sh MeterServiceServer, opts ...
}
h := &meterServiceServer{sh}
var nopts []server.HandlerOption
nopts = append(nopts, v3.HandlerEndpoints(MeterServiceServerEndpoints))
nopts = append(nopts, v4.HandlerEndpoints(MeterServiceServerEndpoints))
return s.Handle(s.NewHandler(&MeterService{h}, append(nopts, opts...)...))
}

View File

@ -1,46 +0,0 @@
package pprof_handler
import (
"expvar"
"net/http"
"net/http/pprof"
"path"
"strings"
)
func NewHandler(prefixPath string, initFuncs ...func()) http.HandlerFunc {
for _, fn := range initFuncs {
fn()
}
return func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.EqualFold(r.RequestURI, prefixPath) && r.RequestURI[len(r.RequestURI)-1] != '/':
http.Redirect(w, r, r.RequestURI+"/", http.StatusMovedPermanently)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "cmdline")):
pprof.Cmdline(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "profile")):
pprof.Profile(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "symbol")):
pprof.Symbol(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "trace")):
pprof.Trace(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "goroutine")):
pprof.Handler("goroutine").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "threadcreate")):
pprof.Handler("threadcreate").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "mutex")):
pprof.Handler("mutex").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "heap")):
pprof.Handler("heap").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "block")):
pprof.Handler("block").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "allocs")):
pprof.Handler("allocs").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "vars")):
expvar.Handler().ServeHTTP(w, r)
default:
pprof.Index(w, r)
}
}
}

View File

@ -1,19 +0,0 @@
package spa
import (
"io/fs"
"net/http"
"strings"
)
// Handler serve files from dir and redirect to index if file not exists
var Handler = func(prefix string, dir fs.FS) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
f := http.StripPrefix(prefix, http.FileServer(http.FS(dir)))
if _, err := fs.Stat(dir, strings.TrimPrefix(r.RequestURI, prefix)); err != nil {
r.RequestURI = prefix
r.URL.Path = prefix
}
f.ServeHTTP(w, r)
}
}

View File

@ -0,0 +1,20 @@
window.onload = function() {
//<editor-fold desc="Changeable Configuration Block">
// the following lines will be replaced by docker/configurator, when it runs in a docker-container
window.ui = SwaggerUIBundle({
url: "{{ .SWAGGER }}",
dom_id: '#swagger-ui',
deepLinking: true,
presets: [
SwaggerUIBundle.presets.apis,
SwaggerUIStandalonePreset
],
plugins: [
SwaggerUIBundle.plugins.DownloadUrl
],
layout: "StandaloneLayout"
});
//</editor-fold>
};

View File

@ -1,4 +1,4 @@
package swaggerui_handler
package swaggerui // import "go.unistack.org/micro-server-http/v4/handler/swagger-ui"
import (
"embed"

View File

@ -1,4 +1,4 @@
package swaggerui_handler
package swaggerui
import (
"net/http"

View File

@ -1,18 +1,24 @@
package swagger_handler
package swagger // import "go.unistack.org/micro-server-http/v4/handler/swagger"
import (
"context"
"io/fs"
"net/http"
yamlcodec "go.unistack.org/micro-codec-yaml/v3"
rutil "go.unistack.org/micro/v3/util/reflect"
httpsrv "go.unistack.org/micro-server-http/v4"
"go.unistack.org/micro/v4/server"
)
type (
Hook func([]byte) []byte
ErrorHandler func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int)
)
// Handler append to generated swagger data from dst map[string]interface{}
var Handler = func(dst map[string]interface{}, fsys fs.FS) http.HandlerFunc {
c := yamlcodec.NewCodec()
var Handler = func(fsys fs.FS, hooks []Hook, h httpsrv.ErrorHandler) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
h(r.Context())
w.WriteHeader(http.StatusNotFound)
return
}
@ -29,32 +35,6 @@ var Handler = func(dst map[string]interface{}, fsys fs.FS) http.HandlerFunc {
return
}
if dst == nil {
w.WriteHeader(http.StatusOK)
_, _ = w.Write(buf)
return
}
var src interface{}
if err = c.Unmarshal(buf, src); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if err = rutil.Merge(src, dst); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if buf, err = c.Marshal(src); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write(buf)
}

260
http.go
View File

@ -1,5 +1,5 @@
// Package http implements a go-micro.Server
package http // import "go.unistack.org/micro-server-http/v3"
package http // import "go.unistack.org/micro-server-http/v4"
import (
"context"
@ -9,18 +9,15 @@ import (
"net"
"net/http"
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/server"
rhttp "go.unistack.org/micro/v3/util/http"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v4/server"
rhttp "go.unistack.org/micro/v4/util/http"
"golang.org/x/net/netutil"
)
@ -31,13 +28,9 @@ type Server struct {
rsvc *register.Service
handlers map[string]server.Handler
exit chan chan error
subscribers map[*httpSubscriber][]broker.Subscriber
errorHandler func(context.Context, server.Handler, http.ResponseWriter, *http.Request, error, int)
pathHandlers *rhttp.Trie
opts server.Options
stateLive *atomic.Uint32
stateReady *atomic.Uint32
stateHealth *atomic.Uint32
registerRPC bool
sync.RWMutex
registered bool
@ -105,10 +98,6 @@ func (h *Server) Init(opts ...server.Option) error {
h.RUnlock()
return err
}
if err := h.opts.Broker.Init(); err != nil {
h.RUnlock()
return err
}
if err := h.opts.Tracer.Init(); err != nil {
h.RUnlock()
return err
@ -121,7 +110,10 @@ func (h *Server) Init(opts ...server.Option) error {
h.RUnlock()
return err
}
if err := h.opts.Transport.Init(); err != nil {
h.RUnlock()
return err
}
h.RUnlock()
h.Lock()
@ -192,11 +184,6 @@ func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) s
}
*/
registerCORS := false
if v, ok := options.Context.Value(registerCORSHandlerKey{}).(bool); ok && v {
registerCORS = true
}
for hn, md := range options.Metadata {
var method reflect.Method
mname := hn[strings.Index(hn, ".")+1:]
@ -210,16 +197,16 @@ func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) s
}
if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil method for %s", mname))
h.opts.Logger.Errorf(h.opts.Context, "nil method for %s", mname)
continue
}
mtype, err := prepareEndpoint(method)
if err != nil && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, "endpoint error", err)
h.opts.Logger.Errorf(h.opts.Context, "%v", err)
continue
} else if mtype == nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil mtype for %s", mname))
h.opts.Logger.Errorf(h.opts.Context, "nil mtype for %s", mname)
continue
}
@ -229,23 +216,14 @@ func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) s
pth := &patHandler{mtype: mtype, name: name, rcvr: rcvr}
hdlr.name = name
methods := []string{md["Method"]}
if registerCORS {
methods = append(methods, http.MethodOptions)
}
if err := hdlr.handlers.Insert(methods, md["Path"], pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add handler for %v %s", methods, md["Path"]))
if err := hdlr.handlers.Insert([]string{md["Method"]}, md["Path"], pth); err != nil {
h.opts.Logger.Errorf(h.opts.Context, "cant add handler for %s %s", md["Method"], md["Path"])
}
if h.registerRPC {
methods := []string{http.MethodPost}
if registerCORS {
methods = append(methods, http.MethodOptions)
}
if err := hdlr.handlers.Insert(methods, "/"+hn, pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add rpc handler for http.MethodPost %s /%s", hn, hn))
h.opts.Logger.Infof(h.opts.Context, "register rpc handler for http.MethodPost %s /%s", hn, hn)
if err := hdlr.handlers.Insert([]string{http.MethodPost}, "/"+hn, pth); err != nil {
h.opts.Logger.Errorf(h.opts.Context, "cant add rpc handler for http.MethodPost %s /%s", hn, hn)
}
}
}
@ -269,16 +247,16 @@ func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) s
}
if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil method for %s", mname))
h.opts.Logger.Errorf(h.opts.Context, "nil method for %s", mname)
continue
}
mtype, err := prepareEndpoint(method)
if err != nil && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, "prepare endpoint error", err)
h.opts.Logger.Errorf(h.opts.Context, "%v", err)
continue
} else if mtype == nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil mtype for %s", mname))
h.opts.Logger.Errorf(h.opts.Context, "nil mtype for %s", mname)
continue
}
@ -288,24 +266,14 @@ func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) s
pth := &patHandler{mtype: mtype, name: name, rcvr: rcvr}
hdlr.name = name
methods := []string{md.Method}
if registerCORS {
methods = append(methods, http.MethodOptions)
}
if err := hdlr.handlers.Insert(methods, md.Path, pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add handler for %s %s", md.Method, md.Path))
if err := hdlr.handlers.Insert([]string{md.Method}, md.Path, pth); err != nil {
h.opts.Logger.Errorf(h.opts.Context, "cant add handler for %s %s", md.Method, md.Path)
}
if h.registerRPC {
methods := []string{http.MethodPost}
if registerCORS {
methods = append(methods, http.MethodOptions)
}
h.opts.Logger.Info(h.opts.Context, fmt.Sprintf("register rpc handler for http.MethodPost %s /%s", hn, hn))
if err := hdlr.handlers.Insert(methods, "/"+hn, pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add rpc handler for http.MethodPost %s /%s", hn, hn))
h.opts.Logger.Infof(h.opts.Context, "register rpc handler for http.MethodPost %s /%s", hn, hn)
if err := hdlr.handlers.Insert([]string{http.MethodPost}, "/"+hn, pth); err != nil {
h.opts.Logger.Errorf(h.opts.Context, "cant add rpc handler for http.MethodPost %s /%s", hn, hn)
}
}
}
@ -313,35 +281,6 @@ func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) s
return hdlr
}
func (h *Server) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
return newSubscriber(topic, handler, opts...)
}
func (h *Server) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*httpSubscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *httpSubscriber")
}
if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
if err := server.ValidateSubscriber(sb); err != nil {
return err
}
h.RLock()
_, ok = h.subscribers[sub]
h.RUnlock()
if ok {
return fmt.Errorf("subscriber %v already exists", h)
}
h.Lock()
h.subscribers[sub] = nil
h.Unlock()
return nil
}
func (h *Server) Register() error {
var eps []*register.Endpoint
h.RLock()
@ -367,28 +306,13 @@ func (h *Server) Register() error {
service.Nodes[0].Metadata["protocol"] = "http"
service.Endpoints = eps
h.Lock()
subscriberList := make([]*httpSubscriber, 0, len(h.subscribers))
for e := range h.subscribers {
// Only advertise non internal subscribers
subscriberList = append(subscriberList, e)
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
for _, e := range subscriberList {
service.Endpoints = append(service.Endpoints, e.Endpoints()...)
}
h.Unlock()
h.RLock()
registered := h.registered
h.RUnlock()
if !registered {
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, fmt.Sprintf("Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID))
config.Logger.Infof(config.Context, "Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)
}
}
@ -403,7 +327,6 @@ func (h *Server) Register() error {
}
h.Lock()
h.registered = true
h.rsvc = service
h.Unlock()
@ -411,35 +334,6 @@ func (h *Server) Register() error {
return nil
}
func (h *Server) subscribe() error {
config := h.opts
for sb := range h.subscribers {
handler := h.createSubHandler(sb, config)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
subCtx := config.Context
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
opts = append(opts, broker.SubscribeContext(subCtx))
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
if err != nil {
h.Unlock()
return err
}
h.subscribers[sb] = []broker.Subscriber{sub}
}
return nil
}
func (h *Server) Deregister() error {
h.RLock()
config := h.opts
@ -451,7 +345,7 @@ func (h *Server) Deregister() error {
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, "Deregistering node: "+service.Nodes[0].ID)
config.Logger.Infof(config.Context, "Deregistering node: %s", service.Nodes[0].ID)
}
if err := server.DefaultDeregisterFunc(service, config); err != nil {
@ -467,23 +361,6 @@ func (h *Server) Deregister() error {
}
h.registered = false
subCtx := h.opts.Context
for sb, subs := range h.subscribers {
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
for _, sub := range subs {
config.Logger.Info(config.Context, "Unsubscribing from topic: "+sub.Topic())
if err := sub.Unsubscribe(subCtx); err != nil {
h.Unlock()
config.Logger.Error(config.Context, fmt.Sprintf("failed to unsubscribe topic: %s, error", sb.Topic()), err)
return err
}
}
h.subscribers[sb] = nil
}
h.Unlock()
return nil
}
@ -518,7 +395,7 @@ func (h *Server) Start() error {
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, "Listening on "+ts.Addr().String())
config.Logger.Infof(config.Context, "Listening on %s", ts.Addr().String())
}
h.Lock()
@ -526,6 +403,7 @@ func (h *Server) Start() error {
h.Unlock()
var handler http.Handler
var srvFunc func(net.Listener) error
// nolint: nestif
if h.opts.Context != nil {
@ -556,13 +434,9 @@ func (h *Server) Start() error {
return fmt.Errorf("cant process with nil handler")
}
if err := config.Broker.Connect(h.opts.Context); err != nil {
return err
}
if err := config.RegisterCheck(h.opts.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error", config.Name, config.ID), err)
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, err)
}
} else {
if err = h.Register(); err != nil {
@ -570,13 +444,8 @@ func (h *Server) Start() error {
}
}
if err := h.subscribe(); err != nil {
return err
}
fn := handler
var hs *http.Server
if h.opts.Context != nil {
if mwf, ok := h.opts.Context.Value(middlewareKey{}).([]func(http.Handler) http.Handler); ok && len(mwf) > 0 {
// wrap the handler func
@ -584,22 +453,25 @@ func (h *Server) Start() error {
fn = mwf[i-1](fn)
}
}
var ok bool
if hs, ok = h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil {
if hs, ok := h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil {
hs.Handler = fn
} else {
hs = &http.Server{Handler: fn}
srvFunc = hs.Serve
}
}
if srvFunc != nil {
go func() {
if cerr := hs.Serve(ts); cerr != nil && !errors.Is(cerr, http.ErrServerClosed) {
h.opts.Logger.Error(h.opts.Context, "serve error", cerr)
if cerr := srvFunc(ts); cerr != nil && !errors.Is(cerr, net.ErrClosed) {
h.opts.Logger.Error(h.opts.Context, cerr)
}
h.stateLive.Store(0)
h.stateReady.Store(0)
h.stateHealth.Store(0)
}()
} else {
go func() {
if cerr := http.Serve(ts, fn); cerr != nil && !errors.Is(cerr, net.ErrClosed) {
h.opts.Logger.Error(h.opts.Context, cerr)
}
}()
}
go func() {
t := new(time.Ticker)
@ -625,28 +497,28 @@ func (h *Server) Start() error {
// nolint: nestif
if rerr != nil && registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error, deregister it", config.Name, config.ID), rerr)
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)
}
// deregister self in case of error
if err := h.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s deregister error", config.Name, config.ID), err)
config.Logger.Errorf(config.Context, "Server %s-%s deregister error: %s", config.Name, config.ID, err)
}
}
} else if rerr != nil && !registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error", config.Name, config.ID), rerr)
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, rerr)
}
continue
}
if err := h.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register error", config.Name, config.ID), err)
config.Logger.Errorf(config.Context, "Server %s-%s register error: %s", config.Name, config.ID, err)
}
}
if err := h.Register(); err != nil {
config.Logger.Error(config.Context, "Server register error", err)
config.Logger.Errorf(config.Context, "Server register error: %s", err)
}
// wait for exit
case ch = <-h.exit:
@ -656,28 +528,12 @@ func (h *Server) Start() error {
// deregister
if err := h.Deregister(); err != nil {
config.Logger.Error(config.Context, "Server deregister error", err)
config.Logger.Errorf(config.Context, "Server deregister error: %s", err)
}
if err := config.Broker.Disconnect(config.Context); err != nil {
config.Logger.Error(config.Context, "Broker disconnect error", err)
}
ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout)
defer cancel()
err := hs.Shutdown(ctx)
if err != nil {
err = hs.Close()
}
ch <- err
ch <- ts.Close()
}()
h.stateLive.Store(1)
h.stateReady.Store(1)
h.stateHealth.Store(1)
return nil
}
@ -695,18 +551,6 @@ func (h *Server) Name() string {
return h.opts.Name
}
func (h *Server) Live() bool {
return h.stateLive.Load() == 1
}
func (h *Server) Ready() bool {
return h.stateReady.Load() == 1
}
func (h *Server) Health() bool {
return h.stateHealth.Load() == 1
}
func NewServer(opts ...server.Option) *Server {
options := server.NewOptions(opts...)
eh := DefaultErrorHandler
@ -714,12 +558,8 @@ func NewServer(opts ...server.Option) *Server {
eh = v
}
return &Server{
stateLive: &atomic.Uint32{},
stateReady: &atomic.Uint32{},
stateHealth: &atomic.Uint32{},
opts: options,
exit: make(chan chan error),
subscribers: make(map[*httpSubscriber][]broker.Subscriber),
errorHandler: eh,
pathHandlers: rhttp.NewTrie(),
}

View File

@ -1,8 +1,8 @@
package http
import (
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/metadata"
)
type httpMessage struct {
@ -11,6 +11,7 @@ type httpMessage struct {
header metadata.Metadata
topic string
contentType string
body []byte
}
func (r *httpMessage) Topic() string {

View File

@ -5,7 +5,7 @@ import (
"fmt"
"net/http"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v4/server"
)
// SetError pass error to caller
@ -69,7 +69,7 @@ func getRspHeader(ctx context.Context) http.Header {
// GetRspCode used internally by generated http server handler
func GetRspCode(ctx context.Context) int {
code := int(200)
var code int
if rsp, ok := ctx.Value(rspCodeKey{}).(*rspCodeVal); ok {
code = rsp.code
}
@ -133,13 +133,6 @@ func RegisterRPCHandler(b bool) server.Option {
return server.SetOption(registerRPCHandlerKey{}, b)
}
type registerCORSHandlerKey struct{}
// RegisterCORSHandler registers cors endpoints with /ServiceName.ServiceEndpoint method POPTIONSOST
func RegisterCORSHandler(b bool) server.HandlerOption {
return server.SetHandlerOption(registerCORSHandlerKey{}, b)
}
type handlerEndpointsKey struct{}
type EndpointMetadata struct {

View File

@ -3,15 +3,12 @@ package http
import (
"io"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/server"
)
var (
_ server.Request = &rpcRequest{}
_ server.Message = &rpcMessage{}
)
var _ server.Request = &rpcRequest{}
type rpcRequest struct {
rw io.ReadWriter
@ -25,14 +22,6 @@ type rpcRequest struct {
stream bool
}
type rpcMessage struct {
payload interface{}
codec codec.Codec
header metadata.Metadata
topic string
contentType string
}
func (r *rpcRequest) ContentType() string {
return r.contentType
}
@ -58,7 +47,11 @@ func (r *rpcRequest) Header() metadata.Metadata {
}
func (r *rpcRequest) Read() ([]byte, error) {
return nil, nil
f := &codec.Frame{}
if err := r.codec.ReadBody(r.rw, f); err != nil {
return nil, err
}
return f.Data, nil
}
func (r *rpcRequest) Stream() bool {
@ -68,23 +61,3 @@ func (r *rpcRequest) Stream() bool {
func (r *rpcRequest) Body() interface{} {
return r.payload
}
func (r *rpcMessage) ContentType() string {
return r.contentType
}
func (r *rpcMessage) Topic() string {
return r.topic
}
func (r *rpcMessage) Body() interface{} {
return r.payload
}
func (r *rpcMessage) Header() metadata.Metadata {
return r.header
}
func (r *rpcMessage) Codec() codec.Codec {
return r.codec
}

View File

@ -7,9 +7,11 @@ import (
"unicode"
"unicode/utf8"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v4/server"
)
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
type methodType struct {
ArgType reflect.Type
ReplyType reflect.Type

View File

@ -1,202 +0,0 @@
package http
import (
"context"
"fmt"
"reflect"
"strings"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/server"
)
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
type handler struct {
reqType reflect.Type
ctxType reflect.Type
method reflect.Value
}
type httpSubscriber struct {
topic string
rcvr reflect.Value
typ reflect.Type
subscriber interface{}
handlers []*handler
endpoints []*register.Endpoint
opts server.SubscriberOptions
}
func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {
options := server.NewSubscriberOptions(opts...)
var endpoints []*register.Endpoint
var handlers []*handler
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
h := &handler{
method: reflect.ValueOf(sub),
}
switch typ.NumIn() {
case 1:
h.reqType = typ.In(0)
case 2:
h.ctxType = typ.In(0)
h.reqType = typ.In(1)
}
handlers = append(handlers, h)
ep := &register.Endpoint{
Name: "Func",
Request: register.ExtractSubValue(typ),
Metadata: metadata.New(2),
}
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
} else {
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
h := &handler{
method: method.Func,
}
switch method.Type.NumIn() {
case 2:
h.reqType = method.Type.In(1)
case 3:
h.ctxType = method.Type.In(1)
h.reqType = method.Type.In(2)
}
handlers = append(handlers, h)
ep := &register.Endpoint{
Name: name + "." + method.Name,
Request: register.ExtractSubValue(method.Type),
Metadata: metadata.New(2),
}
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
}
}
return &httpSubscriber{
rcvr: reflect.ValueOf(sub),
typ: reflect.TypeOf(sub),
topic: topic,
subscriber: sub,
handlers: handlers,
endpoints: endpoints,
opts: options,
}
}
func (s *Server) createSubHandler(sb *httpSubscriber, opts server.Options) broker.Handler {
return func(p broker.Event) error {
msg := p.Message()
ct := msg.Header["Content-Type"]
cf, err := s.newCodec(ct)
if err != nil {
return err
}
hdr := metadata.Copy(msg.Header)
ctx := metadata.NewIncomingContext(context.Background(), hdr)
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var isVal bool
var req reflect.Value
if handler.reqType.Kind() == reflect.Ptr {
req = reflect.New(handler.reqType.Elem())
} else {
req = reflect.New(handler.reqType)
isVal = true
}
if isVal {
req = req.Elem()
}
if err := cf.Unmarshal(msg.Body, req.Interface()); err != nil {
return err
}
fn := func(ctx context.Context, msg server.Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctx))
}
vals = append(vals, reflect.ValueOf(msg.Body()))
returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil {
return err.(error)
}
return nil
}
opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(server.HookSubHandler); ok {
fn = h(fn)
}
})
go func() {
results <- fn(ctx, &httpMessage{
topic: sb.topic,
contentType: ct,
payload: req.Interface(),
header: msg.Header,
codec: cf,
})
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if err := <-results; err != nil {
errors = append(errors, err.Error())
}
}
if len(errors) > 0 {
return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return nil
}
}
func (s *httpSubscriber) Topic() string {
return s.topic
}
func (s *httpSubscriber) Subscriber() interface{} {
return s.subscriber
}
func (s *httpSubscriber) Endpoints() []*register.Endpoint {
return s.endpoints
}
func (s *httpSubscriber) Options() server.SubscriberOptions {
return s.opts
}

View File

@ -5,8 +5,8 @@ import (
"net/http"
"strings"
"go.unistack.org/micro/v3/metadata"
rutil "go.unistack.org/micro/v3/util/reflect"
"go.unistack.org/micro/v4/metadata"
rutil "go.unistack.org/micro/v4/util/reflect"
)
func FillRequest(ctx context.Context, req interface{}, opts ...FillRequestOption) error {

View File

@ -7,53 +7,9 @@ import (
"strings"
"testing"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v4/metadata"
)
func Test_Hook(t *testing.T) {
opts := server.Options{}
var fn server.HandlerFunc = func(fctx context.Context, req server.Request, rsp interface{}) (err error) {
// fmt.Println("1")
return nil
}
var fn2 server.HandlerWrapper = func(next server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
// fmt.Println("2")
return next(ctx, req, rsp)
}
}
var fn3 server.HandlerWrapper = func(next server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
// fmt.Println("3")
return next(ctx, req, rsp)
}
}
var fn4 server.HandlerWrapper = func(next server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
// fmt.Println("4")
return next(ctx, req, rsp)
}
}
opts.Hooks = append(opts.Hooks, fn2, fn3, fn4)
opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(server.HandlerWrapper); ok {
// fmt.Printf("h %#+v\n", h)
fn = h(fn)
}
})
err := fn(nil, nil, nil)
if err != nil {
t.Fatal(err)
}
}
func TestFillrequest(t *testing.T) {
md := metadata.New(1)
md.Set("ClientID", "xxx")