Compare commits

..

No commits in common. "v3" and "upstream" have entirely different histories.
v3 ... upstream

49 changed files with 1001 additions and 4875 deletions

View File

@ -1,24 +0,0 @@
---
name: Bug report
about: For reporting bugs in go-micro
title: "[BUG]"
labels: ''
assignees: ''
---
**Describe the bug**
1. What are you trying to do?
2. What did you expect to happen?
3. What happens instead?
**How to reproduce the bug:**
If possible, please include a minimal code snippet here.
**Environment:**
Go Version: please paste `go version` output here
```
please paste `go env` output here
```

View File

@ -1,17 +0,0 @@
---
name: Feature request / Enhancement
about: If you have a need not served by go-micro
title: "[FEATURE]"
labels: ''
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Additional context**
Add any other context or screenshots about the feature request here.

View File

@ -1,14 +0,0 @@
---
name: Question
about: Ask a question about go-micro
title: ''
labels: ''
assignees: ''
---
Before asking, please check if your question has already been answered:
1. Check the documentation - https://micro.mu/docs/
2. Check the examples and plugins - https://github.com/micro/examples & https://github.com/micro/go-plugins
3. Search existing issues

View File

@ -1,9 +0,0 @@
## Pull Request template
Please, go through these steps before clicking submit on this PR.
1. Give a descriptive title to your PR.
2. Provide a description of your changes.
3. Make sure you have some relevant tests.
4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable).
**PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING**

View File

@ -1,29 +0,0 @@
name: lint
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup deps
run: go get -v ./...
- name: run lint
uses: https://github.com/golangci/golangci-lint-action@v6
with:
version: 'latest'

View File

@ -1,34 +0,0 @@
name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
push:
branches:
- master
- v3
- v4
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup deps
run: go get -v ./...
- name: run test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...

View File

@ -1,53 +0,0 @@
name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
push:
branches:
- master
- v3
- v4
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: checkout tests
uses: actions/checkout@v4
with:
ref: master
filter: 'blob:none'
repository: unistack-org/micro-tests
path: micro-tests
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup go work
env:
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: |
go work init
go work use .
go work use micro-tests
- name: setup deps
env:
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: go get -v ./...
- name: run tests
env:
INTEGRATION_TESTS: yes
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: |
cd micro-tests
go test -mod readonly -v ./... || true

24
.gitignore vendored
View File

@ -1,24 +0,0 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
bin
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
# General
.DS_Store
.idea
.vscode

View File

@ -1,5 +0,0 @@
run:
concurrency: 8
deadline: 5m
issues-exit-code: 1
tests: true

View File

@ -1 +0,0 @@
7e753b1f84b06797ce8c4be8e4b1941b45f8bbb7

192
LICENSE
View File

@ -1,192 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
Copyright 2015-2020 Asim Aslam.
Copyright 2019-2020 Unistack LLC.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -1,6 +1,6 @@
# HTTP Server
The HTTP Server is a go-micro.Server. It's a partial implementation which strips out codecs, transports, etc but enables you
The HTTP Server is a go-micro.Server. It's a partial implementation which strips out codecs, transports, etc but enables you
to create a HTTP Server that could potentially be used for REST based API services.
## Usage
@ -9,8 +9,8 @@ to create a HTTP Server that could potentially be used for REST based API servic
import (
"net/http"
"github.com/unistack-org/micro/v3/server"
httpServer "github.com/unistack-org/micro-server-http"
"github.com/micro/go-micro/server"
httpServer "github.com/micro/go-plugins/server/http"
)
func main() {
@ -37,9 +37,9 @@ Or as part of a service
import (
"net/http"
"github.com/unistack-org/micro/v3"
"github.com/unistack-org/micro/v3/server"
httpServer "github.com/unistack-org/micro-server-http"
"github.com/micro/go-micro"
"github.com/micro/go-micro/server"
httpServer "github.com/micro/go-plugins/server/http"
)
func main() {

14
buffer.go Normal file
View File

@ -0,0 +1,14 @@
package http
import (
"bytes"
)
type buffer struct {
*bytes.Buffer
}
func (b *buffer) Close() error {
b.Buffer.Reset()
return nil
}

161
extractor.go Normal file
View File

@ -0,0 +1,161 @@
package http
import (
"fmt"
"reflect"
"strconv"
"strings"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/server"
"github.com/micro/go-micro/util/addr"
)
func serviceDef(opts server.Options) *registry.Service {
var advt, host string
var port int
if len(opts.Advertise) > 0 {
advt = opts.Advertise
} else {
advt = opts.Address
}
parts := strings.Split(advt, ":")
if len(parts) > 1 {
host = strings.Join(parts[:len(parts)-1], ":")
port, _ = strconv.Atoi(parts[len(parts)-1])
} else {
host = parts[0]
}
addr, err := addr.Extract(host)
if err != nil {
addr = host
}
node := &registry.Node{
Id: opts.Name + "-" + opts.Id,
Address: fmt.Sprintf("%s:%d", addr, port),
Metadata: opts.Metadata,
}
node.Metadata["server"] = "http"
node.Metadata["broker"] = opts.Broker.String()
node.Metadata["registry"] = opts.Registry.String()
node.Metadata["protocol"] = "http"
return &registry.Service{
Name: opts.Name,
Version: opts.Version,
Nodes: []*registry.Node{node},
}
}
func extractValue(v reflect.Type, d int) *registry.Value {
if d == 3 {
return nil
}
if v == nil {
return nil
}
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
arg := &registry.Value{
Name: v.Name(),
Type: v.Name(),
}
switch v.Kind() {
case reflect.Struct:
for i := 0; i < v.NumField(); i++ {
f := v.Field(i)
val := extractValue(f.Type, d+1)
if val == nil {
continue
}
// if we can find a json tag use it
if tags := f.Tag.Get("json"); len(tags) > 0 {
parts := strings.Split(tags, ",")
val.Name = parts[0]
}
// if there's no name default it
if len(val.Name) == 0 {
val.Name = v.Field(i).Name
}
arg.Values = append(arg.Values, val)
}
case reflect.Slice:
p := v.Elem()
if p.Kind() == reflect.Ptr {
p = p.Elem()
}
arg.Type = "[]" + p.Name()
val := extractValue(v.Elem(), d+1)
if val != nil {
arg.Values = append(arg.Values, val)
}
}
return arg
}
func extractEndpoint(method reflect.Method) *registry.Endpoint {
if method.PkgPath != "" {
return nil
}
var rspType, reqType reflect.Type
var stream bool
mt := method.Type
switch mt.NumIn() {
case 3:
reqType = mt.In(1)
rspType = mt.In(2)
case 4:
reqType = mt.In(2)
rspType = mt.In(3)
default:
return nil
}
// are we dealing with a stream?
switch rspType.Kind() {
case reflect.Func, reflect.Interface:
stream = true
}
request := extractValue(reqType, 0)
response := extractValue(rspType, 0)
return &registry.Endpoint{
Name: method.Name,
Request: request,
Response: response,
Metadata: map[string]string{
"stream": fmt.Sprintf("%v", stream),
},
}
}
func extractSubValue(typ reflect.Type) *registry.Value {
var reqType reflect.Type
switch typ.NumIn() {
case 1:
reqType = typ.In(0)
case 2:
reqType = typ.In(1)
case 3:
reqType = typ.In(2)
default:
return nil
}
return extractValue(reqType, 0)
}

24
go.mod
View File

@ -1,23 +1,5 @@
module go.unistack.org/micro-server-http/v3
module github.com/micro/go-plugins/server/http
go 1.22.7
go 1.13
toolchain go1.23.3
require (
go.unistack.org/micro-client-http/v3 v3.9.14
go.unistack.org/micro-codec-yaml/v3 v3.10.2
go.unistack.org/micro-proto/v3 v3.4.1
go.unistack.org/micro/v3 v3.10.108
golang.org/x/net v0.31.0
)
require (
github.com/google/gnostic v0.7.0 // indirect
github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect
golang.org/x/sys v0.27.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 // indirect
google.golang.org/grpc v1.68.0 // indirect
google.golang.org/protobuf v1.35.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
require github.com/micro/go-micro v1.18.0

1924
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -1,715 +1,28 @@
package http
import (
"context"
"fmt"
"io"
"net/http"
"reflect"
"slices"
"strconv"
"strings"
"sync"
"time"
"go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v3/tracer"
rhttp "go.unistack.org/micro/v3/util/http"
rflutil "go.unistack.org/micro/v3/util/reflect"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/server"
)
var (
DefaultErrorHandler = func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int) {
w.WriteHeader(status)
if _, cerr := w.Write([]byte(err.Error())); cerr != nil {
logger.DefaultLogger.Error(ctx, "write error", cerr)
}
}
DefaultContentType = "application/json"
)
type patHandler struct {
mtype *methodType
rcvr reflect.Value
name string
}
type httpHandler struct {
opts server.HandlerOptions
hd interface{}
handlers *rhttp.Trie
name string
eps []*register.Endpoint
sopts server.Options
sync.RWMutex
opts server.HandlerOptions
eps []*registry.Endpoint
hd interface{}
}
func (h *httpHandler) Name() string {
return h.name
return "handler"
}
func (h *httpHandler) Handler() interface{} {
return h.hd
}
func (h *httpHandler) Endpoints() []*register.Endpoint {
func (h *httpHandler) Endpoints() []*registry.Endpoint {
return h.eps
}
func (h *httpHandler) Options() server.HandlerOptions {
return h.opts
}
func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error) {
if handler == nil {
return nil, fmt.Errorf("invalid handler specified: %v", handler)
}
rtype := reflect.TypeOf(handler)
if rtype.NumIn() != 3 {
return nil, fmt.Errorf("invalid handler, NumIn != 3: %v", rtype.NumIn())
}
argType := rtype.In(1)
replyType := rtype.In(2)
// First arg need not be a pointer.
if !isExportedOrBuiltinType(argType) {
return nil, fmt.Errorf("invalid handler, argument type not exported: %v", argType)
}
if replyType.Kind() != reflect.Ptr {
return nil, fmt.Errorf("invalid handler, reply type not a pointer: %v", replyType)
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
return nil, fmt.Errorf("invalid handler, reply type not exported: %v", replyType)
}
if rtype.NumOut() != 1 {
return nil, fmt.Errorf("invalid handler, has wrong number of outs: %v", rtype.NumOut())
}
// The return type of the method must be error.
if returnType := rtype.Out(0); returnType != typeOfError {
return nil, fmt.Errorf("invalid handler, returns %v not error", returnType.String())
}
return func(w http.ResponseWriter, r *http.Request) {
ct := DefaultContentType
if htype := r.Header.Get(metadata.HeaderContentType); htype != "" {
ct = htype
}
ctx := context.WithValue(r.Context(), rspCodeKey{}, &rspCodeVal{})
ctx = context.WithValue(ctx, rspHeaderKey{}, &rspHeaderVal{})
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(len(r.Header) + 8)
}
for k, v := range r.Header {
md[k] = strings.Join(v, ", ")
}
md["RemoteAddr"] = r.RemoteAddr
md["Method"] = r.Method
md["URL"] = r.URL.String()
md["Proto"] = r.Proto
md["Content-Length"] = fmt.Sprintf("%d", r.ContentLength)
md["Transfer-Encoding"] = strings.Join(r.TransferEncoding, ",")
md["Host"] = r.Host
md["RequestURI"] = r.RequestURI
if r.TLS != nil {
md["TLS"] = "true"
md["TLS-ALPN"] = r.TLS.NegotiatedProtocol
md["TLS-ServerName"] = r.TLS.ServerName
}
ctx = metadata.NewIncomingContext(ctx, md)
path := r.URL.Path
if r.Body != nil {
defer r.Body.Close()
}
matches := make(map[string]interface{})
var match bool
var hldr *patHandler
var handler *httpHandler
for _, shdlr := range h.handlers {
hdlr := shdlr.(*httpHandler)
fh, mp, err := hdlr.handlers.Search(r.Method, path)
if err == nil {
match = true
for k, v := range mp {
matches[k] = v
}
hldr = fh.(*patHandler)
handler = hdlr
break
} else if err == rhttp.ErrMethodNotAllowed && !h.registerRPC {
w.WriteHeader(http.StatusMethodNotAllowed)
_, _ = w.Write([]byte("not matching route found"))
return
}
}
if !match && h.registerRPC {
microMethod, mok := md.Get(metadata.HeaderEndpoint)
if mok {
serviceMethod := strings.Split(microMethod, ".")
if len(serviceMethod) == 2 {
if shdlr, ok := h.handlers[serviceMethod[0]]; ok {
hdlr := shdlr.(*httpHandler)
fh, mp, err := hdlr.handlers.Search(http.MethodPost, "/"+microMethod)
if err == nil {
match = true
for k, v := range mp {
matches[k] = v
}
hldr = fh.(*patHandler)
handler = hdlr
}
}
}
}
}
// get fields from url values
if len(r.URL.RawQuery) > 0 {
umd, cerr := rflutil.URLMap(r.URL.RawQuery)
if cerr != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(cerr.Error()))
return
}
for k, v := range umd {
matches[k] = v
}
}
cf, err := h.newCodec(ct)
if err != nil {
w.WriteHeader(http.StatusUnsupportedMediaType)
return
}
var argv, replyv reflect.Value
// Decode the argument value.
argIsValue := false // if true, need to indirect before calling.
if hldr.mtype.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(hldr.mtype.ArgType.Elem())
} else {
argv = reflect.New(hldr.mtype.ArgType)
argIsValue = true
}
if argIsValue {
argv = argv.Elem()
}
// reply value
replyv = reflect.New(hldr.mtype.ReplyType.Elem())
function := hldr.mtype.method.Func
var returnValues []reflect.Value
if r.Body != nil {
var buf []byte
buf, err = io.ReadAll(r.Body)
if err != nil && err != io.EOF {
h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
return
}
if err = cf.Unmarshal(buf, argv.Interface()); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
return
}
}
matches = rflutil.FlattenMap(matches)
if err = rflutil.Merge(argv.Interface(), matches, rflutil.SliceAppend(true), rflutil.Tags([]string{"protobuf", "json"})); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
return
}
hr := &rpcRequest{
codec: cf,
service: handler.sopts.Name,
contentType: ct,
method: fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name),
endpoint: fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name),
payload: argv.Interface(),
header: md,
}
// define the handler func
fn := func(fctx context.Context, req server.Request, rsp interface{}) (err error) {
returnValues = function.Call([]reflect.Value{hldr.rcvr, hldr.mtype.prepareContext(fctx), argv, reflect.ValueOf(rsp)})
// The return value for the method is an error.
if rerr := returnValues[0].Interface(); rerr != nil {
err = rerr.(error)
}
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(0)
}
if nmd, ok := metadata.FromOutgoingContext(fctx); ok {
for k, v := range nmd {
md.Set(k, v)
}
}
metadata.SetOutgoingContext(ctx, md)
return err
}
// wrap the handler func
h.opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(server.HookHandler); ok {
fn = h(fn)
}
})
if ct == "application/x-www-form-urlencoded" {
cf, err = h.newCodec(DefaultContentType)
if err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
return
}
ct = DefaultContentType
}
scode := int(200)
appErr := fn(ctx, hr, replyv.Interface())
w.Header().Set(metadata.HeaderContentType, ct)
if md, ok := metadata.FromOutgoingContext(ctx); ok {
for k, v := range md {
w.Header().Set(k, v)
}
}
if md := getRspHeader(ctx); md != nil {
for k, v := range md {
for _, vv := range v {
w.Header().Add(k, vv)
}
}
}
if nct := w.Header().Get(metadata.HeaderContentType); nct != ct {
if cf, err = h.newCodec(nct); err != nil {
h.errorHandler(ctx, nil, w, r, err, http.StatusInternalServerError)
return
}
}
var buf []byte
if appErr != nil {
switch verr := appErr.(type) {
case *errors.Error:
scode = int(verr.Code)
buf, err = cf.Marshal(verr)
case *Error:
buf, err = cf.Marshal(verr.err)
default:
buf, err = cf.Marshal(appErr)
}
} else {
buf, err = cf.Marshal(replyv.Interface())
}
if err != nil && handler.sopts.Logger.V(logger.ErrorLevel) {
handler.sopts.Logger.Error(handler.sopts.Context, "handler error", err)
return
}
if nscode := GetRspCode(ctx); nscode != 0 {
scode = nscode
}
w.WriteHeader(scode)
if _, cerr := w.Write(buf); cerr != nil {
handler.sopts.Logger.Error(ctx, "write failed", cerr)
}
}, nil
}
func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ct := DefaultContentType
if htype := r.Header.Get(metadata.HeaderContentType); htype != "" {
ct = htype
}
ts := time.Now()
ctx := context.WithValue(r.Context(), rspCodeKey{}, &rspCodeVal{})
ctx = context.WithValue(ctx, rspHeaderKey{}, &rspHeaderVal{})
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(len(r.Header) + 8)
}
for k, v := range r.Header {
md[k] = strings.Join(v, ", ")
}
md["RemoteAddr"] = r.RemoteAddr
if r.TLS != nil {
md["Scheme"] = "https"
} else {
md["Scheme"] = "http"
}
md["Method"] = r.Method
md["URL"] = r.URL.String()
md["Proto"] = r.Proto
md["ContentLength"] = fmt.Sprintf("%d", r.ContentLength)
if len(r.TransferEncoding) > 0 {
md["TransferEncoding"] = strings.Join(r.TransferEncoding, ",")
}
md["Host"] = r.Host
md["RequestURI"] = r.RequestURI
ctx = metadata.NewIncomingContext(ctx, md)
ctx = metadata.NewOutgoingContext(ctx, metadata.New(0))
path := r.URL.Path
if !strings.HasPrefix(path, "/") {
h.errorHandler(ctx, nil, w, r, fmt.Errorf("path must starts with /"), http.StatusBadRequest)
return
}
matches := make(map[string]interface{})
var match bool
var hldr *patHandler
var handler *httpHandler
for _, shdlr := range h.handlers {
hdlr := shdlr.(*httpHandler)
fh, mp, err := hdlr.handlers.Search(r.Method, path)
if err == nil {
match = true
for k, v := range mp {
matches[k] = v
}
hldr = fh.(*patHandler)
handler = hdlr
break
} else if err == rhttp.ErrMethodNotAllowed && !h.registerRPC {
h.errorHandler(ctx, nil, w, r, fmt.Errorf("not matching route found"), http.StatusMethodNotAllowed)
return
}
}
if !match && h.registerRPC {
microMethod, mok := md.Get(metadata.HeaderEndpoint)
if mok {
serviceMethod := strings.Split(microMethod, ".")
if len(serviceMethod) == 2 {
if shdlr, ok := h.handlers[serviceMethod[0]]; ok {
hdlr := shdlr.(*httpHandler)
fh, mp, err := hdlr.handlers.Search(http.MethodPost, "/"+microMethod)
if err == nil {
match = true
for k, v := range mp {
matches[k] = v
}
hldr = fh.(*patHandler)
handler = hdlr
}
}
}
}
}
var sp tracer.Span
if !match && h.hd != nil {
if hdlr, ok := h.hd.Handler().(http.Handler); ok {
endpointName := fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name)
if !slices.Contains(tracer.DefaultSkipEndpoints, endpointName) {
ctx, sp = h.opts.Tracer.Start(ctx, "rpc-server",
tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels(
"endpoint", endpointName,
),
)
defer func() {
n := GetRspCode(ctx)
if s, _ := sp.Status(); s != tracer.SpanStatusError && n > 399 {
sp.SetStatus(tracer.SpanStatusError, http.StatusText(n))
}
sp.Finish()
}()
}
if !slices.Contains(meter.DefaultSkipEndpoints, endpointName) {
h.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", endpointName, "server", "http").Inc()
defer func() {
n := GetRspCode(ctx)
if n > 399 {
h.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", endpointName, "server", "http", "status", "success", "code", strconv.Itoa(n)).Inc()
} else {
h.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", endpointName, "server", "http", "status", "failure", "code", strconv.Itoa(n)).Inc()
}
te := time.Since(ts)
h.opts.Meter.Summary(semconv.ServerRequestLatencyMicroseconds, "endpoint", endpointName, "server", "http").Update(te.Seconds())
h.opts.Meter.Histogram(semconv.ServerRequestDurationSeconds, "endpoint", endpointName, "server", "http").Update(te.Seconds())
h.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", endpointName, "server", "http").Dec()
}()
}
hdlr.ServeHTTP(w, r.WithContext(ctx))
return
}
} else if !match {
// check for http.HandlerFunc handlers
if !slices.Contains(tracer.DefaultSkipEndpoints, r.URL.Path) {
ctx, sp = h.opts.Tracer.Start(ctx, "rpc-server",
tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels(
"endpoint", r.URL.Path,
"server", "http",
),
)
defer func() {
if n := GetRspCode(ctx); n > 399 {
sp.SetStatus(tracer.SpanStatusError, http.StatusText(n))
} else {
sp.SetStatus(tracer.SpanStatusError, http.StatusText(http.StatusNotFound))
}
sp.Finish()
}()
}
if ph, _, err := h.pathHandlers.Search(r.Method, r.URL.Path); err == nil {
ph.(http.HandlerFunc)(w, r.WithContext(ctx))
return
}
h.errorHandler(ctx, nil, w, r, fmt.Errorf("not matching route found"), http.StatusNotFound)
return
}
endpointName := fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name)
topts := []tracer.SpanOption{
tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels(
"endpoint", endpointName,
"server", "http",
),
}
if slices.Contains(tracer.DefaultSkipEndpoints, endpointName) {
topts = append(topts, tracer.WithSpanRecord(false))
}
ctx, sp = h.opts.Tracer.Start(ctx, "rpc-server", topts...)
if !slices.Contains(meter.DefaultSkipEndpoints, handler.name) {
defer func() {
te := time.Since(ts)
h.opts.Meter.Summary(semconv.ServerRequestLatencyMicroseconds, "endpoint", handler.name, "server", "http").Update(te.Seconds())
h.opts.Meter.Histogram(semconv.ServerRequestDurationSeconds, "endpoint", handler.name, "server", "http").Update(te.Seconds())
h.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", handler.name, "server", "http").Dec()
n := GetRspCode(ctx)
if n > 399 {
h.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", handler.name, "server", "http", "status", "failure", "code", strconv.Itoa(n)).Inc()
} else {
h.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", handler.name, "server", "http", "status", "success", "code", strconv.Itoa(n)).Inc()
}
}()
}
defer func() {
n := GetRspCode(ctx)
if n > 399 {
if s, _ := sp.Status(); s != tracer.SpanStatusError {
sp.SetStatus(tracer.SpanStatusError, http.StatusText(n))
}
}
sp.Finish()
}()
// get fields from url values
if len(r.URL.RawQuery) > 0 {
umd, cerr := rflutil.URLMap(r.URL.RawQuery)
if cerr != nil {
h.errorHandler(ctx, handler, w, r, cerr, http.StatusBadRequest)
return
}
for k, v := range umd {
matches[k] = v
}
}
if r.Body != nil {
defer r.Body.Close()
}
cf, err := h.newCodec(ct)
if err != nil {
h.errorHandler(ctx, nil, w, r, err, http.StatusBadRequest)
return
}
var argv, replyv reflect.Value
// Decode the argument value.
argIsValue := false // if true, need to indirect before calling.
if hldr.mtype.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(hldr.mtype.ArgType.Elem())
} else {
argv = reflect.New(hldr.mtype.ArgType)
argIsValue = true
}
if argIsValue {
argv = argv.Elem()
}
// reply value
replyv = reflect.New(hldr.mtype.ReplyType.Elem())
function := hldr.mtype.method.Func
var returnValues []reflect.Value
if r.Body != nil {
var buf []byte
buf, err = io.ReadAll(r.Body)
if err != nil && err != io.EOF {
h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
return
}
if err = cf.Unmarshal(buf, argv.Interface()); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
return
}
}
if len(matches) > 0 {
matches = rflutil.FlattenMap(matches)
if err = rflutil.Merge(argv.Interface(), matches, rflutil.SliceAppend(true), rflutil.Tags([]string{"protobuf", "json"})); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
return
}
}
hr := &rpcRequest{
codec: cf,
service: handler.sopts.Name,
contentType: ct,
method: fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name),
endpoint: fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name),
payload: argv.Interface(),
header: md,
}
// define the handler func
fn := func(fctx context.Context, req server.Request, rsp interface{}) (err error) {
returnValues = function.Call([]reflect.Value{hldr.rcvr, hldr.mtype.prepareContext(fctx), argv, reflect.ValueOf(rsp)})
// The return value for the method is an error.
if rerr := returnValues[0].Interface(); rerr != nil {
err = rerr.(error)
}
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(0)
}
if nmd, ok := metadata.FromOutgoingContext(fctx); ok {
for k, v := range nmd {
md.Set(k, v)
}
}
metadata.SetOutgoingContext(ctx, md)
if err != nil && sp != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
return err
}
h.opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(server.HookHandler); ok {
fn = h(fn)
}
})
if ct == "application/x-www-form-urlencoded" {
cf, err = h.newCodec(DefaultContentType)
if err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
return
}
ct = DefaultContentType
}
scode := int(200)
appErr := fn(ctx, hr, replyv.Interface())
w.Header().Set(metadata.HeaderContentType, ct)
if md, ok := metadata.FromOutgoingContext(ctx); ok {
for k, v := range md {
w.Header().Set(k, v)
}
}
if md := getRspHeader(ctx); md != nil {
for k, v := range md {
for _, vv := range v {
w.Header().Add(k, vv)
}
}
}
if nct := w.Header().Get(metadata.HeaderContentType); nct != ct {
if cf, err = h.newCodec(nct); err != nil {
h.errorHandler(ctx, nil, w, r, err, http.StatusInternalServerError)
return
}
}
var buf []byte
if appErr != nil {
switch verr := appErr.(type) {
case *errors.Error:
scode = int(verr.Code)
buf, err = cf.Marshal(verr)
case *Error:
buf, err = cf.Marshal(verr.err)
default:
buf, err = cf.Marshal(appErr)
}
} else {
buf, err = cf.Marshal(replyv.Interface())
}
if err != nil {
if handler.sopts.Logger.V(logger.ErrorLevel) {
handler.sopts.Logger.Error(handler.sopts.Context, "handler error", err)
}
scode = http.StatusInternalServerError
} else if nscode := GetRspCode(ctx); nscode != 0 {
scode = nscode
}
w.WriteHeader(scode)
if _, cerr := w.Write(buf); cerr != nil {
handler.sopts.Logger.Error(ctx, "respoonse write error", cerr)
}
}

View File

@ -1,12 +0,0 @@
package handler
import (
// import required packages
_ "go.unistack.org/micro-proto/v3/openapiv3"
)
//go:generate sh -c "curl -L https://github.com/swagger-api/swagger-ui/archive/refs/tags/v4.18.3.zip -o - | bsdtar -C swagger-ui --strip-components=2 -xv swagger-ui-4.18.3/dist && rm swagger-ui/*.map swagger-ui/*-es-*.js swagger-ui/swagger-ui.js swagger-ui/swagger-initializer.js"
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v3) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ ./meter/meter.proto"
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v3) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ ./health/health.proto"

View File

@ -1,77 +0,0 @@
//go:build ignore
package graphql_handler
import (
"context"
"fmt"
"github.com/99designs/gqlgen/graphql"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/store"
)
var _ graphql.Cache = (*cacheWrapper)(nil)
type Handler struct {
opts Options
}
type Option func(*Options)
type Options struct {
cache *cacheWrapper
Path string
}
type cacheWrapper struct {
s store.Store
l logger.Logger
}
func (c *cacheWrapper) Get(ctx context.Context, key string) (interface{}, bool) {
var val interface{}
if err := c.s.Read(ctx, key, val); err != nil && err != store.ErrNotFound {
c.l.Error(ctx, fmt.Sprintf("cache.Get %s failed", key), err)
return nil, false
}
return val, true
}
func (c *cacheWrapper) Add(ctx context.Context, key string, val interface{}) {
if err := c.s.Write(ctx, key, val); err != nil {
c.l.Error(ctx, fmt.Sprintf("cache.Add %s failed", key), err)
}
}
func Store(s store.Store) Option {
return func(o *Options) {
if o.cache == nil {
o.cache = &cacheWrapper{}
}
o.cache.s = s
}
}
func Logger(l logger.Logger) Option {
return func(o *Options) {
if o.cache == nil {
o.cache = &cacheWrapper{}
}
o.cache.l = l
}
}
func Path(path string) Option {
return func(o *Options) {
o.Path = path
}
}
func NewHandler(opts ...Option) *Handler {
options := Options{}
for _, o := range opts {
o(&options)
}
return &Handler{opts: options}
}

View File

@ -1,136 +0,0 @@
package health_handler
import (
"context"
codecpb "go.unistack.org/micro-proto/v3/codec"
"go.unistack.org/micro/v3/errors"
)
var _ HealthServiceServer = &Handler{}
type Handler struct {
opts Options
}
type (
CheckFunc func(context.Context) error
Option func(*Options)
)
type Stater interface {
Live() bool
Ready() bool
Health() bool
}
type Options struct {
Version string
Name string
Staters []Stater
LiveChecks []CheckFunc
ReadyChecks []CheckFunc
HealthChecks []CheckFunc
}
func Service(s ...Stater) Option {
return func(o *Options) {
o.Staters = append(o.Staters, s...)
}
}
func LiveChecks(fns ...CheckFunc) Option {
return func(o *Options) {
o.LiveChecks = append(o.LiveChecks, fns...)
}
}
func ReadyChecks(fns ...CheckFunc) Option {
return func(o *Options) {
o.ReadyChecks = append(o.ReadyChecks, fns...)
}
}
func HealthChecks(fns ...CheckFunc) Option {
return func(o *Options) {
o.HealthChecks = append(o.HealthChecks, fns...)
}
}
func Name(name string) Option {
return func(o *Options) {
o.Name = name
}
}
func Version(version string) Option {
return func(o *Options) {
o.Version = version
}
}
func NewHandler(opts ...Option) *Handler {
options := Options{}
for _, o := range opts {
o(&options)
}
return &Handler{opts: options}
}
func (h *Handler) Healthy(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
var err error
for _, s := range h.opts.Staters {
if !s.Health() {
return errors.ServiceUnavailable(h.opts.Name, "%v", err)
}
}
for _, fn := range h.opts.HealthChecks {
if err = fn(ctx); err != nil {
return errors.ServiceUnavailable(h.opts.Name, "%v", err)
}
}
return nil
}
func (h *Handler) Live(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
var err error
for _, s := range h.opts.Staters {
if !s.Live() {
return errors.ServiceUnavailable(h.opts.Name, "%v", err)
}
}
for _, fn := range h.opts.LiveChecks {
if err = fn(ctx); err != nil {
return errors.ServiceUnavailable(h.opts.Name, "%v", err)
}
}
return nil
}
func (h *Handler) Ready(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
var err error
for _, s := range h.opts.Staters {
if !s.Ready() {
return errors.ServiceUnavailable(h.opts.Name, "%v", err)
}
}
for _, fn := range h.opts.ReadyChecks {
if err = fn(ctx); err != nil {
return errors.ServiceUnavailable(h.opts.Name, "%v", err)
}
}
return nil
}
func (h *Handler) Version(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
rsp.Data = []byte(h.opts.Version)
return nil
}

View File

@ -1,71 +0,0 @@
syntax = "proto3";
package micro.server.http.v3.handler.health;
option go_package = "go.unistack.org/micro-server-http/v3/handler/health;health_handler";
import "api/annotations.proto";
import "openapiv3/annotations.proto";
import "codec/frame.proto";
service HealthService {
rpc Healthy(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv3.openapiv3_operation) = {
operation_id: "Healthy";
responses: {
default: {
reference: {
_ref: "micro.codec.Frame";
};
};
};
};
option (micro.api.http) = {
get: "/health";
additional_bindings: { get: "/healthz"; }
};
};
rpc Live(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv3.openapiv3_operation) = {
operation_id: "Live";
responses: {
default: {
reference: {
_ref: "micro.codec.Frame";
};
};
};
};
option (micro.api.http) = {
get: "/live";
additional_bindings: { get: "/livez"; }
};
};
rpc Ready(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv3.openapiv3_operation) = {
operation_id: "Ready";
responses: {
default: {
reference: {
_ref: "micro.codec.Frame";
};
};
};
};
option (micro.api.http) = { get: "/ready";
additional_bindings: { get: "/readyz"; }
};
};
rpc Version(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv3.openapiv3_operation) = {
operation_id: "Version";
responses: {
default: {
reference: {
_ref: "micro.codec.Frame";
};
};
};
};
option (micro.api.http) = { get: "/version"; };
};
};

View File

@ -1,31 +0,0 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// versions:
// - protoc-gen-go-micro v3.10.4
// - protoc v5.28.3
// source: health/health.proto
package health_handler
import (
context "context"
codec "go.unistack.org/micro-proto/v3/codec"
client "go.unistack.org/micro/v3/client"
)
var (
HealthServiceName = "HealthService"
)
type HealthServiceClient interface {
Healthy(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error)
Live(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error)
Ready(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error)
Version(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error)
}
type HealthServiceServer interface {
Healthy(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Version(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
}

View File

@ -1,186 +0,0 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// protoc-gen-go-micro version: v3.10.4
// source: health/health.proto
package health_handler
import (
context "context"
v31 "go.unistack.org/micro-client-http/v3"
codec "go.unistack.org/micro-proto/v3/codec"
v3 "go.unistack.org/micro-server-http/v3"
client "go.unistack.org/micro/v3/client"
server "go.unistack.org/micro/v3/server"
http "net/http"
)
var (
HealthServiceServerEndpoints = []v3.EndpointMetadata{
{
Name: "HealthService.Healthy",
Path: "/health",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Healthy",
Path: "/healthz",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Live",
Path: "/live",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Live",
Path: "/livez",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Ready",
Path: "/ready",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Ready",
Path: "/readyz",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Version",
Path: "/version",
Method: "GET",
Body: "",
Stream: false,
},
}
)
type healthServiceClient struct {
c client.Client
name string
}
func NewHealthServiceClient(name string, c client.Client) HealthServiceClient {
return &healthServiceClient{c: c, name: name}
}
func (c *healthServiceClient) Healthy(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error) {
errmap := make(map[string]interface{}, 1)
errmap["default"] = &codec.Frame{}
opts = append(opts,
v31.ErrorMap(errmap),
)
opts = append(opts,
v31.Method(http.MethodGet),
v31.Path("/health"),
)
rsp := &codec.Frame{}
err := c.c.Call(ctx, c.c.NewRequest(c.name, "HealthService.Healthy", req), rsp, opts...)
if err != nil {
return nil, err
}
return rsp, nil
}
func (c *healthServiceClient) Live(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error) {
errmap := make(map[string]interface{}, 1)
errmap["default"] = &codec.Frame{}
opts = append(opts,
v31.ErrorMap(errmap),
)
opts = append(opts,
v31.Method(http.MethodGet),
v31.Path("/live"),
)
rsp := &codec.Frame{}
err := c.c.Call(ctx, c.c.NewRequest(c.name, "HealthService.Live", req), rsp, opts...)
if err != nil {
return nil, err
}
return rsp, nil
}
func (c *healthServiceClient) Ready(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error) {
errmap := make(map[string]interface{}, 1)
errmap["default"] = &codec.Frame{}
opts = append(opts,
v31.ErrorMap(errmap),
)
opts = append(opts,
v31.Method(http.MethodGet),
v31.Path("/ready"),
)
rsp := &codec.Frame{}
err := c.c.Call(ctx, c.c.NewRequest(c.name, "HealthService.Ready", req), rsp, opts...)
if err != nil {
return nil, err
}
return rsp, nil
}
func (c *healthServiceClient) Version(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error) {
errmap := make(map[string]interface{}, 1)
errmap["default"] = &codec.Frame{}
opts = append(opts,
v31.ErrorMap(errmap),
)
opts = append(opts,
v31.Method(http.MethodGet),
v31.Path("/version"),
)
rsp := &codec.Frame{}
err := c.c.Call(ctx, c.c.NewRequest(c.name, "HealthService.Version", req), rsp, opts...)
if err != nil {
return nil, err
}
return rsp, nil
}
type healthServiceServer struct {
HealthServiceServer
}
func (h *healthServiceServer) Healthy(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
return h.HealthServiceServer.Healthy(ctx, req, rsp)
}
func (h *healthServiceServer) Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
return h.HealthServiceServer.Live(ctx, req, rsp)
}
func (h *healthServiceServer) Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
return h.HealthServiceServer.Ready(ctx, req, rsp)
}
func (h *healthServiceServer) Version(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
return h.HealthServiceServer.Version(ctx, req, rsp)
}
func RegisterHealthServiceServer(s server.Server, sh HealthServiceServer, opts ...server.HandlerOption) error {
type healthService interface {
Healthy(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Version(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
}
type HealthService struct {
healthService
}
h := &healthServiceServer{sh}
var nopts []server.HandlerOption
nopts = append(nopts, v3.HandlerEndpoints(HealthServiceServerEndpoints))
return s.Handle(s.NewHandler(&HealthService{h}, append(nopts, opts...)...))
}

View File

@ -1,132 +0,0 @@
package meter_handler
import (
"bytes"
"compress/gzip"
"context"
"io"
"strings"
"sync"
codecpb "go.unistack.org/micro-proto/v3/codec"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/meter"
)
const (
contentEncodingHeader = "Content-Encoding"
acceptEncodingHeader = "Accept-Encoding"
)
var gzipPool = sync.Pool{
New: func() interface{} {
return gzip.NewWriter(nil)
},
}
var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
}
// guard to fail early
var _ MeterServiceServer = &Handler{}
type Handler struct {
Options Options
}
type Option func(*Options)
type Options struct {
Meter meter.Meter
Name string
MeterOptions []meter.Option
DisableCompress bool
}
func Meter(m meter.Meter) Option {
return func(o *Options) {
o.Meter = m
}
}
func Name(name string) Option {
return func(o *Options) {
o.Name = name
}
}
func DisableCompress(g bool) Option {
return func(o *Options) {
o.DisableCompress = g
}
}
func MeterOptions(opts ...meter.Option) Option {
return func(o *Options) {
o.MeterOptions = append(o.MeterOptions, opts...)
}
}
func NewOptions(opts ...Option) Options {
options := Options{Meter: meter.DefaultMeter, DisableCompress: false}
for _, o := range opts {
o(&options)
}
return options
}
func NewHandler(opts ...Option) *Handler {
options := NewOptions(opts...)
return &Handler{Options: options}
}
func (h *Handler) Metrics(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
log, ok := logger.FromContext(ctx)
if !ok {
log = logger.DefaultLogger
}
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
w := io.Writer(buf)
if md, ok := metadata.FromOutgoingContext(ctx); gzipAccepted(md) && ok && !h.Options.DisableCompress {
omd, _ := metadata.FromOutgoingContext(ctx)
omd.Set(contentEncodingHeader, "gzip")
gz := gzipPool.Get().(*gzip.Writer)
defer gzipPool.Put(gz)
gz.Reset(w)
defer gz.Close()
w = gz
gz.Flush()
}
if err := h.Options.Meter.Write(w, h.Options.MeterOptions...); err != nil {
log.Error(ctx, "http/meter write failed", err)
return nil
}
rsp.Data = buf.Bytes()
return nil
}
// gzipAccepted returns whether the client will accept gzip-encoded content.
func gzipAccepted(md metadata.Metadata) bool {
a, ok := md.Get(acceptEncodingHeader)
if !ok {
return false
}
if strings.Contains(a, "gzip") {
return true
}
return false
}

View File

@ -1,24 +0,0 @@
syntax = "proto3";
package micro.server.http.v3.handler.meter;
option go_package = "go.unistack.org/micro-server-http/v3/handler/meter;meter_handler";
import "api/annotations.proto";
import "openapiv3/annotations.proto";
import "codec/frame.proto";
service MeterService {
rpc Metrics(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv3.openapiv3_operation) = {
operation_id: "Metrics";
responses: {
default: {
reference: {
_ref: "micro.codec.Frame";
};
};
};
};
option (micro.api.http) = { get: "/metrics"; };
};
};

View File

@ -1,25 +0,0 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// versions:
// - protoc-gen-go-micro v3.10.4
// - protoc v5.28.3
// source: meter/meter.proto
package meter_handler
import (
context "context"
codec "go.unistack.org/micro-proto/v3/codec"
client "go.unistack.org/micro/v3/client"
)
var (
MeterServiceName = "MeterService"
)
type MeterServiceClient interface {
Metrics(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error)
}
type MeterServiceServer interface {
Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
}

View File

@ -1,75 +0,0 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// protoc-gen-go-micro version: v3.10.4
// source: meter/meter.proto
package meter_handler
import (
context "context"
v31 "go.unistack.org/micro-client-http/v3"
codec "go.unistack.org/micro-proto/v3/codec"
v3 "go.unistack.org/micro-server-http/v3"
client "go.unistack.org/micro/v3/client"
server "go.unistack.org/micro/v3/server"
http "net/http"
)
var (
MeterServiceServerEndpoints = []v3.EndpointMetadata{
{
Name: "MeterService.Metrics",
Path: "/metrics",
Method: "GET",
Body: "",
Stream: false,
},
}
)
type meterServiceClient struct {
c client.Client
name string
}
func NewMeterServiceClient(name string, c client.Client) MeterServiceClient {
return &meterServiceClient{c: c, name: name}
}
func (c *meterServiceClient) Metrics(ctx context.Context, req *codec.Frame, opts ...client.CallOption) (*codec.Frame, error) {
errmap := make(map[string]interface{}, 1)
errmap["default"] = &codec.Frame{}
opts = append(opts,
v31.ErrorMap(errmap),
)
opts = append(opts,
v31.Method(http.MethodGet),
v31.Path("/metrics"),
)
rsp := &codec.Frame{}
err := c.c.Call(ctx, c.c.NewRequest(c.name, "MeterService.Metrics", req), rsp, opts...)
if err != nil {
return nil, err
}
return rsp, nil
}
type meterServiceServer struct {
MeterServiceServer
}
func (h *meterServiceServer) Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
return h.MeterServiceServer.Metrics(ctx, req, rsp)
}
func RegisterMeterServiceServer(s server.Server, sh MeterServiceServer, opts ...server.HandlerOption) error {
type meterService interface {
Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
}
type MeterService struct {
meterService
}
h := &meterServiceServer{sh}
var nopts []server.HandlerOption
nopts = append(nopts, v3.HandlerEndpoints(MeterServiceServerEndpoints))
return s.Handle(s.NewHandler(&MeterService{h}, append(nopts, opts...)...))
}

View File

@ -1,46 +0,0 @@
package pprof_handler
import (
"expvar"
"net/http"
"net/http/pprof"
"path"
"strings"
)
func NewHandler(prefixPath string, initFuncs ...func()) http.HandlerFunc {
for _, fn := range initFuncs {
fn()
}
return func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.EqualFold(r.RequestURI, prefixPath) && r.RequestURI[len(r.RequestURI)-1] != '/':
http.Redirect(w, r, r.RequestURI+"/", http.StatusMovedPermanently)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "cmdline")):
pprof.Cmdline(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "profile")):
pprof.Profile(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "symbol")):
pprof.Symbol(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "trace")):
pprof.Trace(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "goroutine")):
pprof.Handler("goroutine").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "threadcreate")):
pprof.Handler("threadcreate").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "mutex")):
pprof.Handler("mutex").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "heap")):
pprof.Handler("heap").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "block")):
pprof.Handler("block").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "allocs")):
pprof.Handler("allocs").ServeHTTP(w, r)
case strings.HasPrefix(r.RequestURI, path.Join(prefixPath, "vars")):
expvar.Handler().ServeHTTP(w, r)
default:
pprof.Index(w, r)
}
}
}

View File

@ -1,19 +0,0 @@
package spa
import (
"io/fs"
"net/http"
"strings"
)
// Handler serve files from dir and redirect to index if file not exists
var Handler = func(prefix string, dir fs.FS) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
f := http.StripPrefix(prefix, http.FileServer(http.FS(dir)))
if _, err := fs.Stat(dir, strings.TrimPrefix(r.RequestURI, prefix)); err != nil {
r.RequestURI = prefix
r.URL.Path = prefix
}
f.ServeHTTP(w, r)
}
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 665 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 628 B

View File

@ -1,16 +0,0 @@
html {
box-sizing: border-box;
overflow: -moz-scrollbars-vertical;
overflow-y: scroll;
}
*,
*:before,
*:after {
box-sizing: inherit;
}
body {
margin: 0;
background: #fafafa;
}

View File

@ -1,19 +0,0 @@
<!-- HTML for static distribution bundle build -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Swagger UI</title>
<link rel="stylesheet" type="text/css" href="./swagger-ui.css" />
<link rel="stylesheet" type="text/css" href="index.css" />
<link rel="icon" type="image/png" href="./favicon-32x32.png" sizes="32x32" />
<link rel="icon" type="image/png" href="./favicon-16x16.png" sizes="16x16" />
</head>
<body>
<div id="swagger-ui"></div>
<script src="./swagger-ui-bundle.js" charset="UTF-8"> </script>
<script src="./swagger-ui-standalone-preset.js" charset="UTF-8"> </script>
<script src="./swagger-initializer.js" charset="UTF-8"> </script>
</body>
</html>

View File

@ -1,79 +0,0 @@
<!doctype html>
<html lang="en-US">
<head>
<title>Swagger UI: OAuth2 Redirect</title>
</head>
<body>
<script>
'use strict';
function run () {
var oauth2 = window.opener.swaggerUIRedirectOauth2;
var sentState = oauth2.state;
var redirectUrl = oauth2.redirectUrl;
var isValid, qp, arr;
if (/code|token|error/.test(window.location.hash)) {
qp = window.location.hash.substring(1).replace('?', '&');
} else {
qp = location.search.substring(1);
}
arr = qp.split("&");
arr.forEach(function (v,i,_arr) { _arr[i] = '"' + v.replace('=', '":"') + '"';});
qp = qp ? JSON.parse('{' + arr.join() + '}',
function (key, value) {
return key === "" ? value : decodeURIComponent(value);
}
) : {};
isValid = qp.state === sentState;
if ((
oauth2.auth.schema.get("flow") === "accessCode" ||
oauth2.auth.schema.get("flow") === "authorizationCode" ||
oauth2.auth.schema.get("flow") === "authorization_code"
) && !oauth2.auth.code) {
if (!isValid) {
oauth2.errCb({
authId: oauth2.auth.name,
source: "auth",
level: "warning",
message: "Authorization may be unsafe, passed state was changed in server. The passed state wasn't returned from auth server."
});
}
if (qp.code) {
delete oauth2.state;
oauth2.auth.code = qp.code;
oauth2.callback({auth: oauth2.auth, redirectUrl: redirectUrl});
} else {
let oauthErrorMsg;
if (qp.error) {
oauthErrorMsg = "["+qp.error+"]: " +
(qp.error_description ? qp.error_description+ ". " : "no accessCode received from the server. ") +
(qp.error_uri ? "More info: "+qp.error_uri : "");
}
oauth2.errCb({
authId: oauth2.auth.name,
source: "auth",
level: "error",
message: oauthErrorMsg || "[Authorization failed]: no accessCode received from the server."
});
}
} else {
oauth2.callback({auth: oauth2.auth, token: qp, isValid: isValid, redirectUrl: redirectUrl});
}
window.close();
}
if (document.readyState !== 'loading') {
run();
} else {
document.addEventListener('DOMContentLoaded', function () {
run();
});
}
</script>
</body>
</html>

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,142 +0,0 @@
package swaggerui_handler
import (
"embed"
"html/template"
"net/http"
"path"
"reflect"
)
//go:embed *.js *.css *.html *.png
var assets embed.FS
var (
Handler = func(prefix string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet || path.Base(r.URL.Path) != "swagger-initializer.js" {
http.StripPrefix(prefix, http.FileServer(http.FS(assets))).ServeHTTP(w, r)
return
}
tpl := template.New("swagger-initializer.js").Funcs(TemplateFuncs)
ptpl, err := tpl.Parse(Template)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if err := ptpl.Execute(w, Config); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
}
}
TemplateFuncs = template.FuncMap{
"isInt": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Int, reflect.Int8, reflect.Int32, reflect.Int64, reflect.Uint, reflect.Uint8, reflect.Uint32, reflect.Uint64, reflect.Float32, reflect.Float64:
return true
default:
return false
}
},
"isBool": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Bool:
return true
default:
return false
}
},
"isString": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.String:
return true
default:
return false
}
},
"isSlice": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Slice:
return true
default:
return false
}
},
"isMap": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Map:
return true
default:
return false
}
},
}
Template = `
window.onload = function() {
//<editor-fold desc="Changeable Configuration Block">
window.ui = SwaggerUIBundle({
{{- range $k, $v := . }}
{{- if (eq (printf "%s" $v) "") -}}
{{- continue -}}
{{ end }}
{{ $k }}: {{ if isBool $v -}}
{{- $v -}},
{{- else if isInt $v -}}
{{- $v -}},
{{- else if isString $v -}}
"{{- $v -}}",
{{- else if and (isSlice $v) (or (eq (printf "%s" $k) "presets") (eq (printf "%s" $k) "plugins")) -}}
[
{{- range $v }}
{{ . }},
{{- end }}
],
{{- end -}}
{{ end }}
});
//</editor-fold>
};`
Config = map[string]interface{}{
"configUrl": "",
"dom_id": "#swagger-ui",
/*
"domNode": "",
"spec": "",
"urls": []interface{}{
map[string]interface{}{
"url": "",
"name": "",
},
},
},
*/
"url": "https://petstore.swagger.io/v2/swagger.json",
"deepLinking": true,
"displayOperationId": false,
"defaultModelsExpandDepth": 1,
"defaultModelExpandDepth": 1,
"displayRequestDuration": true,
"filter": true,
"operationsSorter": "alpha",
"showExtensions": true,
"tryItOutEnabled": true,
"presets": []string{
"SwaggerUIBundle.presets.apis",
"SwaggerUIStandalonePreset",
},
"plugins": []string{
"SwaggerUIBundle.plugins.DownloadUrl",
},
"layout": "StandaloneLayout",
}
)

View File

@ -1,15 +0,0 @@
package swaggerui_handler
import (
"net/http"
"testing"
)
func TestTemplate(t *testing.T) {
t.Skip()
h := http.NewServeMux()
h.HandleFunc("/", Handler(""))
if err := http.ListenAndServe(":8080", h); err != nil {
t.Fatal(err)
}
}

View File

@ -1,61 +0,0 @@
package swagger_handler
import (
"io/fs"
"net/http"
yamlcodec "go.unistack.org/micro-codec-yaml/v3"
rutil "go.unistack.org/micro/v3/util/reflect"
)
// Handler append to generated swagger data from dst map[string]interface{}
var Handler = func(dst map[string]interface{}, fsys fs.FS) http.HandlerFunc {
c := yamlcodec.NewCodec()
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotFound)
return
}
path := r.URL.Path
if len(path) > 1 && path[0] == '/' {
path = path[1:]
}
buf, err := fs.ReadFile(fsys, path)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if dst == nil {
w.WriteHeader(http.StatusOK)
_, _ = w.Write(buf)
return
}
var src interface{}
if err = c.Unmarshal(buf, src); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if err = rutil.Merge(src, dst); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if buf, err = c.Marshal(src); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write(buf)
}
}

676
http.go
View File

@ -1,323 +1,118 @@
// Package http implements a go-micro.Server
package http // import "go.unistack.org/micro-server-http/v3"
package http
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
"net/http"
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/server"
rhttp "go.unistack.org/micro/v3/util/http"
"golang.org/x/net/netutil"
log "github.com/micro/go-log"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/protorpc"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/server"
)
var _ server.Server = (*Server)(nil)
var (
defaultCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec,
"application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec,
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec,
}
)
type Server struct {
hd server.Handler
rsvc *register.Service
handlers map[string]server.Handler
exit chan chan error
subscribers map[*httpSubscriber][]broker.Subscriber
errorHandler func(context.Context, server.Handler, http.ResponseWriter, *http.Request, error, int)
pathHandlers *rhttp.Trie
type httpServer struct {
sync.Mutex
opts server.Options
stateLive *atomic.Uint32
stateReady *atomic.Uint32
stateHealth *atomic.Uint32
registerRPC bool
sync.RWMutex
hd server.Handler
exit chan chan error
registerOnce sync.Once
subscribers map[*httpSubscriber][]broker.Subscriber
// used for first registration
registered bool
init bool
}
func (h *Server) newCodec(ct string) (codec.Codec, error) {
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx]
}
h.RLock()
cf, ok := h.opts.Codecs[ct]
h.RUnlock()
if ok {
func init() {
cmd.DefaultServers["http"] = NewServer
}
func (h *httpServer) newCodec(contentType string) (codec.NewCodec, error) {
if cf, ok := h.opts.Codecs[contentType]; ok {
return cf, nil
}
return nil, codec.ErrUnknownContentType
if cf, ok := defaultCodecs[contentType]; ok {
return cf, nil
}
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
}
func (h *Server) Options() server.Options {
func (h *httpServer) Options() server.Options {
h.Lock()
opts := h.opts
h.Unlock()
return opts
}
func (h *Server) Init(opts ...server.Option) error {
if len(opts) == 0 && h.init {
return nil
}
func (h *httpServer) Init(opts ...server.Option) error {
h.Lock()
for _, o := range opts {
o(&h.opts)
}
if fn, ok := h.opts.Context.Value(errorHandlerKey{}).(func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int)); ok && fn != nil {
h.errorHandler = fn
}
if h.handlers == nil {
h.handlers = make(map[string]server.Handler)
}
if h.pathHandlers == nil {
h.pathHandlers = rhttp.NewTrie()
}
if v, ok := h.opts.Context.Value(registerRPCHandlerKey{}).(bool); ok {
h.registerRPC = v
}
if phs, ok := h.opts.Context.Value(pathHandlerKey{}).(*pathHandlerVal); ok && phs.h != nil {
for pm, ps := range phs.h {
for pp, ph := range ps {
if err := h.pathHandlers.Insert([]string{pm}, pp, ph); err != nil {
h.Unlock()
return err
}
}
}
}
h.Unlock()
h.RLock()
if err := h.opts.Register.Init(); err != nil {
h.RUnlock()
return err
}
if err := h.opts.Broker.Init(); err != nil {
h.RUnlock()
return err
}
if err := h.opts.Tracer.Init(); err != nil {
h.RUnlock()
return err
}
if err := h.opts.Logger.Init(); err != nil {
h.RUnlock()
return err
}
if err := h.opts.Meter.Init(); err != nil {
h.RUnlock()
return err
}
h.RUnlock()
h.Lock()
h.init = true
h.Unlock()
return nil
}
func (h *Server) Handle(handler server.Handler) error {
// passed unknown handler
hdlr, ok := handler.(*httpHandler)
if !ok {
h.Lock()
h.hd = handler
h.Unlock()
return nil
func (h *httpServer) Handle(handler server.Handler) error {
if _, ok := handler.Handler().(http.Handler); !ok {
return errors.New("Handle requires http.Handler")
}
// passed http.Handler like some muxer
if _, ok := hdlr.hd.(http.Handler); ok {
h.Lock()
h.hd = handler
h.Unlock()
return nil
}
// passed micro compat handler
h.Lock()
if h.handlers == nil {
h.handlers = make(map[string]server.Handler)
}
h.handlers[handler.Name()] = handler
h.hd = handler
h.Unlock()
return nil
}
func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
options := server.NewHandlerOptions(opts...)
eps := make([]*register.Endpoint, 0, len(options.Metadata))
for name, metadata := range options.Metadata {
eps = append(eps, &register.Endpoint{
Name: name,
Metadata: metadata,
})
func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
options := server.HandlerOptions{
Metadata: make(map[string]map[string]string),
}
hdlr := &httpHandler{
eps: eps,
hd: handler,
opts: options,
sopts: h.opts,
handlers: rhttp.NewTrie(),
for _, o := range opts {
o(&options)
}
tp := reflect.TypeOf(handler)
var eps []*registry.Endpoint
/*
if len(options.Metadata) == 0 {
if h.registerRPC {
h.opts.Logger.Infof(h.opts.Context, "register rpc handler for http.MethodPost %s /%s", hn, hn)
if err := hdlr.handlers.Insert([]string{http.MethodPost}, "/"+hn, pth); err != nil {
h.opts.Logger.Errorf(h.opts.Context, "cant add rpc handler for http.MethodPost %s /%s", hn, hn)
}
}
}
*/
registerCORS := false
if v, ok := options.Context.Value(registerCORSHandlerKey{}).(bool); ok && v {
registerCORS = true
}
for hn, md := range options.Metadata {
var method reflect.Method
mname := hn[strings.Index(hn, ".")+1:]
for m := 0; m < tp.NumMethod(); m++ {
mn := tp.Method(m)
if mn.Name != mname {
continue
}
method = mn
break
}
if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil method for %s", mname))
continue
}
mtype, err := prepareEndpoint(method)
if err != nil && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, "endpoint error", err)
continue
} else if mtype == nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil mtype for %s", mname))
continue
}
rcvr := reflect.ValueOf(handler)
name := reflect.Indirect(rcvr).Type().Name()
pth := &patHandler{mtype: mtype, name: name, rcvr: rcvr}
hdlr.name = name
methods := []string{md["Method"]}
if registerCORS {
methods = append(methods, http.MethodOptions)
}
if err := hdlr.handlers.Insert(methods, md["Path"], pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add handler for %v %s", methods, md["Path"]))
}
if h.registerRPC {
methods := []string{http.MethodPost}
if registerCORS {
methods = append(methods, http.MethodOptions)
}
if err := hdlr.handlers.Insert(methods, "/"+hn, pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add rpc handler for http.MethodPost %s /%s", hn, hn))
}
if !options.Internal {
for name, metadata := range options.Metadata {
eps = append(eps, &registry.Endpoint{
Name: name,
Metadata: metadata,
})
}
}
metadata, ok := options.Context.Value(handlerEndpointsKey{}).([]EndpointMetadata)
if !ok {
return hdlr
return &httpHandler{
eps: eps,
hd: handler,
opts: options,
}
for _, md := range metadata {
hn := md.Name
var method reflect.Method
mname := hn[strings.Index(hn, ".")+1:]
for m := 0; m < tp.NumMethod(); m++ {
mn := tp.Method(m)
if mn.Name != mname {
continue
}
method = mn
break
}
if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil method for %s", mname))
continue
}
mtype, err := prepareEndpoint(method)
if err != nil && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, "prepare endpoint error", err)
continue
} else if mtype == nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil mtype for %s", mname))
continue
}
rcvr := reflect.ValueOf(handler)
name := reflect.Indirect(rcvr).Type().Name()
pth := &patHandler{mtype: mtype, name: name, rcvr: rcvr}
hdlr.name = name
methods := []string{md.Method}
if registerCORS {
methods = append(methods, http.MethodOptions)
}
if err := hdlr.handlers.Insert(methods, md.Path, pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add handler for %s %s", md.Method, md.Path))
}
if h.registerRPC {
methods := []string{http.MethodPost}
if registerCORS {
methods = append(methods, http.MethodOptions)
}
h.opts.Logger.Info(h.opts.Context, fmt.Sprintf("register rpc handler for http.MethodPost %s /%s", hn, hn))
if err := hdlr.handlers.Insert(methods, "/"+hn, pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add rpc handler for http.MethodPost %s /%s", hn, hn))
}
}
}
return hdlr
}
func (h *Server) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
func (h *httpServer) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
return newSubscriber(topic, handler, opts...)
}
func (h *Server) Subscribe(sb server.Subscriber) error {
func (h *httpServer) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*httpSubscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *httpSubscriber")
@ -326,161 +121,103 @@ func (h *Server) Subscribe(sb server.Subscriber) error {
return fmt.Errorf("invalid subscriber: no handler functions")
}
if err := server.ValidateSubscriber(sb); err != nil {
if err := validateSubscriber(sb); err != nil {
return err
}
h.RLock()
h.Lock()
defer h.Unlock()
_, ok = h.subscribers[sub]
h.RUnlock()
if ok {
return fmt.Errorf("subscriber %v already exists", h)
}
h.Lock()
h.subscribers[sub] = nil
h.Unlock()
return nil
}
func (h *Server) Register() error {
var eps []*register.Endpoint
h.RLock()
for _, hdlr := range h.handlers {
eps = append(eps, hdlr.Endpoints()...)
}
rsvc := h.rsvc
config := h.opts
h.RUnlock()
func (h *httpServer) Register() error {
h.Lock()
opts := h.opts
eps := h.hd.Endpoints()
h.Unlock()
// if service already filled, reuse it and return early
if rsvc != nil {
if err := server.DefaultRegisterFunc(rsvc, config); err != nil {
return err
}
return nil
}
service, err := server.NewRegisterService(h)
if err != nil {
return err
}
service.Nodes[0].Metadata["protocol"] = "http"
service := serviceDef(opts)
service.Endpoints = eps
h.Lock()
subscriberList := make([]*httpSubscriber, 0, len(h.subscribers))
var subscriberList []*httpSubscriber
for e := range h.subscribers {
// Only advertise non internal subscribers
subscriberList = append(subscriberList, e)
if !e.Options().Internal {
subscriberList = append(subscriberList, e)
}
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
for _, e := range subscriberList {
service.Endpoints = append(service.Endpoints, e.Endpoints()...)
}
h.Unlock()
h.RLock()
registered := h.registered
h.RUnlock()
if !registered {
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, fmt.Sprintf("Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID))
}
rOpts := []registry.RegisterOption{
registry.RegisterTTL(opts.RegisterTTL),
}
// register the service
if err := server.DefaultRegisterFunc(service, config); err != nil {
h.registerOnce.Do(func() {
log.Logf("Registering node: %s", opts.Name+"-"+opts.Id)
})
if err := opts.Registry.Register(service, rOpts...); err != nil {
return err
}
// already registered? don't need to register subscribers
if registered {
h.Lock()
defer h.Unlock()
if h.registered {
return nil
}
h.Lock()
h.registered = true
h.rsvc = service
h.Unlock()
return nil
}
func (h *Server) subscribe() error {
config := h.opts
for sb := range h.subscribers {
handler := h.createSubHandler(sb, config)
var opts []broker.SubscribeOption
for sb, _ := range h.subscribers {
handler := h.createSubHandler(sb, opts)
var subOpts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
subOpts = append(subOpts, broker.Queue(queue))
}
subCtx := config.Context
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
opts = append(opts, broker.SubscribeContext(subCtx))
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
sub, err := opts.Broker.Subscribe(sb.Topic(), handler, subOpts...)
if err != nil {
h.Unlock()
return err
}
h.subscribers[sb] = []broker.Subscriber{sub}
}
return nil
}
func (h *Server) Deregister() error {
h.RLock()
config := h.opts
h.RUnlock()
func (h *httpServer) Deregister() error {
h.Lock()
opts := h.opts
h.Unlock()
service, err := server.NewRegisterService(h)
if err != nil {
return err
}
log.Logf("Deregistering node: %s", opts.Name+"-"+opts.Id)
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, "Deregistering node: "+service.Nodes[0].ID)
}
if err := server.DefaultDeregisterFunc(service, config); err != nil {
service := serviceDef(opts)
if err := opts.Registry.Deregister(service); err != nil {
return err
}
h.Lock()
h.rsvc = nil
if !h.registered {
h.Unlock()
return nil
}
h.registered = false
subCtx := h.opts.Context
for sb, subs := range h.subscribers {
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
for _, sub := range subs {
config.Logger.Info(config.Context, "Unsubscribing from topic: "+sub.Topic())
if err := sub.Unsubscribe(subCtx); err != nil {
h.Unlock()
config.Logger.Error(config.Context, fmt.Sprintf("failed to unsubscribe topic: %s, error", sb.Topic()), err)
return err
}
log.Logf("Unsubscribing from topic: %s", sub.Topic())
sub.Unsubscribe()
}
h.subscribers[sb] = nil
}
@ -488,126 +225,46 @@ func (h *Server) Deregister() error {
return nil
}
func (h *Server) Start() error {
h.RLock()
config := h.opts
h.RUnlock()
// micro: config.Transport.Listen(config.Address)
var ts net.Listener
if l := config.Listener; l != nil {
ts = l
} else {
var err error
// check the tls config for secure connect
if tc := config.TLSConfig; tc != nil {
ts, err = tls.Listen("tcp", config.Address, tc)
// otherwise just plain tcp listener
} else {
ts, err = net.Listen("tcp", config.Address)
}
if err != nil {
return err
}
}
if config.MaxConn > 0 {
ts = netutil.LimitListener(ts, config.MaxConn)
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, "Listening on "+ts.Addr().String())
}
func (h *httpServer) Start() error {
h.Lock()
h.opts.Address = ts.Addr().String()
opts := h.opts
hd := h.hd
h.Unlock()
var handler http.Handler
// nolint: nestif
if h.opts.Context != nil {
if hs, ok := h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil {
if hs.Handler == nil && h.hd != nil {
if hdlr, ok := h.hd.Handler().(http.Handler); ok {
hs.Handler = hdlr
handler = hs.Handler
}
} else {
handler = hs.Handler
}
}
}
switch {
case handler == nil && h.hd == nil:
handler = h
case len(h.handlers) > 0 && h.hd != nil:
handler = h
case handler == nil && h.hd != nil:
if hdlr, ok := h.hd.Handler().(http.Handler); ok {
handler = hdlr
}
}
if handler == nil {
return fmt.Errorf("cant process with nil handler")
}
if err := config.Broker.Connect(h.opts.Context); err != nil {
ln, err := net.Listen("tcp", opts.Address)
if err != nil {
return err
}
if err := config.RegisterCheck(h.opts.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error", config.Name, config.ID), err)
}
} else {
if err = h.Register(); err != nil {
return err
}
log.Logf("Listening on %s", ln.Addr().String())
h.Lock()
h.opts.Address = ln.Addr().String()
h.Unlock()
handler, ok := hd.Handler().(http.Handler)
if !ok {
return errors.New("Server required http.Handler")
}
if err := h.subscribe(); err != nil {
if err = opts.Broker.Connect(); err != nil {
return err
}
fn := handler
var hs *http.Server
if h.opts.Context != nil {
if mwf, ok := h.opts.Context.Value(middlewareKey{}).([]func(http.Handler) http.Handler); ok && len(mwf) > 0 {
// wrap the handler func
for i := len(mwf); i > 0; i-- {
fn = mwf[i-1](fn)
}
}
var ok bool
if hs, ok = h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil {
hs.Handler = fn
} else {
hs = &http.Server{Handler: fn}
}
// register
if err = h.Register(); err != nil {
return err
}
go func() {
if cerr := hs.Serve(ts); cerr != nil && !errors.Is(cerr, http.ErrServerClosed) {
h.opts.Logger.Error(h.opts.Context, "serve error", cerr)
}
h.stateLive.Store(0)
h.stateReady.Store(0)
h.stateHealth.Store(0)
}()
go http.Serve(ln, handler)
go func() {
t := new(time.Ticker)
// only process if it exists
if config.RegisterInterval > time.Duration(0) {
if opts.RegisterInterval > time.Duration(0) {
// new ticker
t = time.NewTicker(config.RegisterInterval)
t = time.NewTicker(opts.RegisterInterval)
}
// return error chan
@ -618,35 +275,8 @@ func (h *Server) Start() error {
select {
// register self on interval
case <-t.C:
h.RLock()
registered := h.registered
h.RUnlock()
rerr := config.RegisterCheck(h.opts.Context)
// nolint: nestif
if rerr != nil && registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error, deregister it", config.Name, config.ID), rerr)
}
// deregister self in case of error
if err := h.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s deregister error", config.Name, config.ID), err)
}
}
} else if rerr != nil && !registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error", config.Name, config.ID), rerr)
}
continue
}
if err := h.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register error", config.Name, config.ID), err)
}
}
if err := h.Register(); err != nil {
config.Logger.Error(config.Context, "Server register error", err)
log.Log("Server register error: ", err)
}
// wait for exit
case ch = <-h.exit:
@ -654,73 +284,35 @@ func (h *Server) Start() error {
}
}
ch <- ln.Close()
// deregister
if err := h.Deregister(); err != nil {
config.Logger.Error(config.Context, "Server deregister error", err)
}
h.Deregister()
if err := config.Broker.Disconnect(config.Context); err != nil {
config.Logger.Error(config.Context, "Broker disconnect error", err)
}
ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout)
defer cancel()
err := hs.Shutdown(ctx)
if err != nil {
err = hs.Close()
}
ch <- err
opts.Broker.Disconnect()
}()
h.stateLive.Store(1)
h.stateReady.Store(1)
h.stateHealth.Store(1)
return nil
}
func (h *Server) Stop() error {
func (h *httpServer) Stop() error {
ch := make(chan error)
h.exit <- ch
return <-ch
}
func (h *Server) String() string {
func (h *httpServer) String() string {
return "http"
}
func (h *Server) Name() string {
return h.opts.Name
}
func (h *Server) Live() bool {
return h.stateLive.Load() == 1
}
func (h *Server) Ready() bool {
return h.stateReady.Load() == 1
}
func (h *Server) Health() bool {
return h.stateHealth.Load() == 1
}
func NewServer(opts ...server.Option) *Server {
options := server.NewOptions(opts...)
eh := DefaultErrorHandler
if v, ok := options.Context.Value(errorHandlerKey{}).(errorHandler); ok && v != nil {
eh = v
}
return &Server{
stateLive: &atomic.Uint32{},
stateReady: &atomic.Uint32{},
stateHealth: &atomic.Uint32{},
opts: options,
exit: make(chan chan error),
subscribers: make(map[*httpSubscriber][]broker.Subscriber),
errorHandler: eh,
pathHandlers: rhttp.NewTrie(),
func newServer(opts ...server.Option) server.Server {
return &httpServer{
opts: newOptions(opts...),
exit: make(chan chan error),
subscribers: make(map[*httpSubscriber][]broker.Subscriber),
}
}
func NewServer(opts ...server.Option) server.Server {
return newServer(opts...)
}

72
http_test.go Normal file
View File

@ -0,0 +1,72 @@
package http
import (
"fmt"
"io/ioutil"
"net/http"
"testing"
"github.com/micro/go-micro/registry/memory"
"github.com/micro/go-micro/server"
)
func TestHTTPServer(t *testing.T) {
reg := memory.NewRegistry()
// create server
srv := NewServer(server.Registry(reg))
// create server mux
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`hello world`))
})
// create handler
hd := srv.NewHandler(mux)
// register handler
if err := srv.Handle(hd); err != nil {
t.Fatal(err)
}
// start server
if err := srv.Start(); err != nil {
t.Fatal(err)
}
// lookup server
service, err := reg.GetService(server.DefaultName)
if err != nil {
t.Fatal(err)
}
if len(service) != 1 {
t.Fatalf("Expected 1 service got %d: %+v", len(service), service)
}
if len(service[0].Nodes) != 1 {
t.Fatalf("Expected 1 node got %d: %+v", len(service[0].Nodes), service[0].Nodes)
}
// make request
rsp, err := http.Get(fmt.Sprintf("http://%s", service[0].Nodes[0].Address))
if err != nil {
t.Fatal(err)
}
defer rsp.Body.Close()
b, err := ioutil.ReadAll(rsp.Body)
if err != nil {
t.Fatal(err)
}
if s := string(b); s != "hello world" {
t.Fatalf("Expected response %s, got %s", "hello world", s)
}
// stop server
if err := srv.Stop(); err != nil {
t.Fatal(err)
}
}

View File

@ -1,34 +1,19 @@
package http
import (
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
)
type httpMessage struct {
payload interface{}
codec codec.Codec
header metadata.Metadata
topic string
contentType string
}
func (r *httpMessage) Topic() string {
return r.topic
payload interface{}
}
func (r *httpMessage) ContentType() string {
return r.contentType
}
func (r *httpMessage) Header() metadata.Metadata {
return r.header
func (r *httpMessage) Topic() string {
return r.topic
}
func (r *httpMessage) Body() interface{} {
func (r *httpMessage) Payload() interface{} {
return r.payload
}
func (r *httpMessage) Codec() codec.Codec {
return r.codec
}

View File

@ -2,173 +2,47 @@ package http
import (
"context"
"fmt"
"net/http"
"go.unistack.org/micro/v3/server"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/server"
)
// SetError pass error to caller
func SetError(err interface{}) error {
return &Error{err: err}
}
// GetError return underline error
func GetError(err interface{}) interface{} {
if verr, ok := err.(*Error); ok {
return verr.err
func newOptions(opt ...server.Option) server.Options {
opts := server.Options{
Codecs: make(map[string]codec.NewCodec),
Metadata: map[string]string{},
Context: context.Background(),
}
return err
}
// Error struct holds error
type Error struct {
err interface{}
}
// Error func for error interface
func (err *Error) Error() string {
return fmt.Sprintf("%v", err.err)
}
type (
rspCodeKey struct{}
rspCodeVal struct {
code int
for _, o := range opt {
o(&opts)
}
)
type (
rspHeaderKey struct{}
rspHeaderVal struct {
h http.Header
if opts.Broker == nil {
opts.Broker = broker.DefaultBroker
}
)
// SetRspHeader add response headers
func SetRspHeader(ctx context.Context, h http.Header) {
if rsp, ok := ctx.Value(rspHeaderKey{}).(*rspHeaderVal); ok {
rsp.h = h
if opts.Registry == nil {
opts.Registry = registry.DefaultRegistry
}
}
// SetRspCode saves response code in context, must be used by handler to specify http code
func SetRspCode(ctx context.Context, code int) {
if rsp, ok := ctx.Value(rspCodeKey{}).(*rspCodeVal); ok {
rsp.code = code
if len(opts.Address) == 0 {
opts.Address = server.DefaultAddress
}
}
// getRspHeader get http.Header from context
func getRspHeader(ctx context.Context) http.Header {
if rsp, ok := ctx.Value(rspHeaderKey{}).(*rspHeaderVal); ok {
return rsp.h
if len(opts.Name) == 0 {
opts.Name = server.DefaultName
}
return nil
}
// GetRspCode used internally by generated http server handler
func GetRspCode(ctx context.Context) int {
code := int(200)
if rsp, ok := ctx.Value(rspCodeKey{}).(*rspCodeVal); ok {
code = rsp.code
if len(opts.Id) == 0 {
opts.Id = server.DefaultId
}
return code
}
type middlewareKey struct{}
// Middleware passes http middlewares
func Middleware(mw ...func(http.Handler) http.Handler) server.Option {
return server.SetOption(middlewareKey{}, mw)
}
type serverKey struct{}
// HTTPServer provide ability to pass *http.Server
func HTTPServer(hs *http.Server) server.Option {
return server.SetOption(serverKey{}, hs)
}
type errorHandler func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int)
type errorHandlerKey struct{}
// ErrorHandler specifies handler for errors
func ErrorHandler(fn errorHandler) server.Option {
return server.SetOption(errorHandlerKey{}, fn)
}
type (
pathHandlerKey struct{}
pathHandlerVal struct {
h map[string]map[string]http.HandlerFunc
if len(opts.Version) == 0 {
opts.Version = server.DefaultVersion
}
)
// PathHandler specifies http handler for path regexp
func PathHandler(method, path string, handler http.HandlerFunc) server.Option {
return func(o *server.Options) {
if o.Context == nil {
o.Context = context.Background()
}
v, ok := o.Context.Value(pathHandlerKey{}).(*pathHandlerVal)
if !ok {
v = &pathHandlerVal{h: make(map[string]map[string]http.HandlerFunc)}
}
m, ok := v.h[method]
if !ok {
m = make(map[string]http.HandlerFunc)
v.h[method] = m
}
m[path] = handler
o.Context = context.WithValue(o.Context, pathHandlerKey{}, v)
}
}
type registerRPCHandlerKey struct{}
// RegisterRPCHandler registers compatibility endpoints with /ServiceName.ServiceEndpoint method POST
func RegisterRPCHandler(b bool) server.Option {
return server.SetOption(registerRPCHandlerKey{}, b)
}
type registerCORSHandlerKey struct{}
// RegisterCORSHandler registers cors endpoints with /ServiceName.ServiceEndpoint method POPTIONSOST
func RegisterCORSHandler(b bool) server.HandlerOption {
return server.SetHandlerOption(registerCORSHandlerKey{}, b)
}
type handlerEndpointsKey struct{}
type EndpointMetadata struct {
Name string
Path string
Method string
Body string
Stream bool
}
func HandlerEndpoints(md []EndpointMetadata) server.HandlerOption {
return server.SetHandlerOption(handlerEndpointsKey{}, md)
}
type handlerOptions struct {
headers []string
cookies []string
}
type FillRequestOption func(*handlerOptions)
func Header(headers ...string) FillRequestOption {
return func(o *handlerOptions) {
o.headers = append(o.headers, headers...)
}
}
func Cookie(cookies ...string) FillRequestOption {
return func(o *handlerOptions) {
o.cookies = append(o.cookies, cookies...)
}
return opts
}

View File

@ -1,90 +0,0 @@
package http
import (
"io"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
)
var (
_ server.Request = &rpcRequest{}
_ server.Message = &rpcMessage{}
)
type rpcRequest struct {
rw io.ReadWriter
payload interface{}
codec codec.Codec
header metadata.Metadata
method string
endpoint string
contentType string
service string
stream bool
}
type rpcMessage struct {
payload interface{}
codec codec.Codec
header metadata.Metadata
topic string
contentType string
}
func (r *rpcRequest) ContentType() string {
return r.contentType
}
func (r *rpcRequest) Service() string {
return r.service
}
func (r *rpcRequest) Method() string {
return r.method
}
func (r *rpcRequest) Endpoint() string {
return r.endpoint
}
func (r *rpcRequest) Codec() codec.Codec {
return r.codec
}
func (r *rpcRequest) Header() metadata.Metadata {
return r.header
}
func (r *rpcRequest) Read() ([]byte, error) {
return nil, nil
}
func (r *rpcRequest) Stream() bool {
return r.stream
}
func (r *rpcRequest) Body() interface{} {
return r.payload
}
func (r *rpcMessage) ContentType() string {
return r.contentType
}
func (r *rpcMessage) Topic() string {
return r.topic
}
func (r *rpcMessage) Body() interface{} {
return r.payload
}
func (r *rpcMessage) Header() metadata.Metadata {
return r.header
}
func (r *rpcMessage) Codec() codec.Codec {
return r.codec
}

105
server.go
View File

@ -1,105 +0,0 @@
package http
import (
"context"
"fmt"
"reflect"
"unicode"
"unicode/utf8"
"go.unistack.org/micro/v3/server"
)
type methodType struct {
ArgType reflect.Type
ReplyType reflect.Type
ContextType reflect.Type
method reflect.Method
stream bool
}
// Is this an exported - upper case - name?
func isExported(name string) bool {
r, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(r)
}
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
// prepareEndpoint() returns a methodType for the provided method or nil
// in case if the method was unsuitable.
func prepareEndpoint(method reflect.Method) (*methodType, error) {
mtype := method.Type
mname := method.Name
var replyType, argType, contextType reflect.Type
var stream bool
// Endpoint() must be exported.
if method.PkgPath != "" {
return nil, fmt.Errorf("Endpoint must be exported")
}
switch mtype.NumIn() {
case 3:
// assuming streaming
argType = mtype.In(2)
contextType = mtype.In(1)
stream = true
case 4:
// method that takes a context
argType = mtype.In(2)
replyType = mtype.In(3)
contextType = mtype.In(1)
default:
return nil, fmt.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn())
}
switch stream {
case true:
// check stream type
streamType := reflect.TypeOf((*server.Stream)(nil)).Elem()
if !argType.Implements(streamType) {
return nil, fmt.Errorf("%v argument does not implement Streamer interface: %v", mname, argType)
}
default:
// First arg need not be a pointer.
if !isExportedOrBuiltinType(argType) {
return nil, fmt.Errorf("%v argument type not exported: %v", mname, argType)
}
if replyType.Kind() != reflect.Ptr {
return nil, fmt.Errorf("method %v reply type not a pointer: %v", mname, replyType)
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
return nil, fmt.Errorf("method %v reply type not exported: %v", mname, replyType)
}
}
// Endpoint() needs one out.
if mtype.NumOut() != 1 {
return nil, fmt.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut())
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
return nil, fmt.Errorf("method %v returns %v not error", mname, returnType.String())
}
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}, nil
}
func (m *methodType) prepareContext(ctx context.Context) reflect.Value {
if contextv := reflect.ValueOf(ctx); contextv.IsValid() {
return contextv
}
return reflect.Zero(m.ContextType)
}

View File

@ -1,24 +1,31 @@
package http
import (
"bytes"
"context"
"fmt"
"reflect"
"strings"
"unicode"
"unicode/utf8"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/server"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/server"
)
const (
subSig = "func(context.Context, interface{}) error"
)
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
type handler struct {
method reflect.Value
reqType reflect.Type
ctxType reflect.Type
method reflect.Value
}
type httpSubscriber struct {
@ -27,14 +34,33 @@ type httpSubscriber struct {
typ reflect.Type
subscriber interface{}
handlers []*handler
endpoints []*register.Endpoint
endpoints []*registry.Endpoint
opts server.SubscriberOptions
}
func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {
options := server.NewSubscriberOptions(opts...)
// Is this an exported - upper case - name?
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}
var endpoints []*register.Endpoint
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {
var options server.SubscriberOptions
for _, o := range opts {
o(&options)
}
var endpoints []*registry.Endpoint
var handlers []*handler
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
@ -51,14 +77,15 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
}
handlers = append(handlers, h)
ep := &register.Endpoint{
Name: "Func",
Request: register.ExtractSubValue(typ),
Metadata: metadata.New(2),
}
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
endpoints = append(endpoints, &registry.Endpoint{
Name: "Func",
Request: extractSubValue(typ),
Metadata: map[string]string{
"topic": topic,
"subscriber": "true",
},
})
} else {
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
@ -78,14 +105,15 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
}
handlers = append(handlers, h)
ep := &register.Endpoint{
Name: name + "." + method.Name,
Request: register.ExtractSubValue(method.Type),
Metadata: metadata.New(2),
}
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
endpoints = append(endpoints, &registry.Endpoint{
Name: name + "." + method.Name,
Request: extractSubValue(method.Type),
Metadata: map[string]string{
"topic": topic,
"subscriber": "true",
},
})
}
}
@ -100,7 +128,61 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
}
}
func (s *Server) createSubHandler(sb *httpSubscriber, opts server.Options) broker.Handler {
func validateSubscriber(sub server.Subscriber) error {
typ := reflect.TypeOf(sub.Subscriber())
var argType reflect.Type
if typ.Kind() == reflect.Func {
name := "Func"
switch typ.NumIn() {
case 2:
argType = typ.In(1)
default:
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
}
if typ.NumOut() != 1 {
return fmt.Errorf("subscriber %v has wrong number of outs: %v require signature %s",
name, typ.NumOut(), subSig)
}
if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
}
} else {
hdlr := reflect.ValueOf(sub.Subscriber())
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
switch method.Type.NumIn() {
case 3:
argType = method.Type.In(2)
default:
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
name, method.Name, method.Type.NumIn(), subSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("%v argument type not exported: %v", name, argType)
}
if method.Type.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of outs: %v require signature %s",
name, method.Name, method.Type.NumOut(), subSig)
}
if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
}
}
}
return nil
}
func (s *httpServer) createSubHandler(sb *httpSubscriber, opts server.Options) broker.Handler {
return func(p broker.Event) error {
msg := p.Message()
ct := msg.Header["Content-Type"]
@ -109,8 +191,12 @@ func (s *Server) createSubHandler(sb *httpSubscriber, opts server.Options) broke
return err
}
hdr := metadata.Copy(msg.Header)
ctx := metadata.NewIncomingContext(context.Background(), hdr)
hdr := make(map[string]string)
for k, v := range msg.Header {
hdr[k] = v
}
delete(hdr, "Content-Type")
ctx := metadata.NewContext(context.Background(), hdr)
results := make(chan error, len(sb.handlers))
@ -130,7 +216,15 @@ func (s *Server) createSubHandler(sb *httpSubscriber, opts server.Options) broke
req = req.Elem()
}
if err := cf.Unmarshal(msg.Body, req.Interface()); err != nil {
b := &buffer{bytes.NewBuffer(msg.Body)}
co := cf(b)
defer co.Close()
if err := co.ReadHeader(&codec.Message{}, codec.Event); err != nil {
return err
}
if err := co.ReadBody(req.Interface()); err != nil {
return err
}
@ -143,7 +237,7 @@ func (s *Server) createSubHandler(sb *httpSubscriber, opts server.Options) broke
vals = append(vals, reflect.ValueOf(ctx))
}
vals = append(vals, reflect.ValueOf(msg.Body()))
vals = append(vals, reflect.ValueOf(msg.Payload()))
returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil {
@ -152,19 +246,15 @@ func (s *Server) createSubHandler(sb *httpSubscriber, opts server.Options) broke
return nil
}
opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(server.HookSubHandler); ok {
fn = h(fn)
}
})
for i := len(opts.SubWrappers); i > 0; i-- {
fn = opts.SubWrappers[i-1](fn)
}
go func() {
results <- fn(ctx, &httpMessage{
topic: sb.topic,
contentType: ct,
payload: req.Interface(),
header: msg.Header,
codec: cf,
})
}()
}
@ -193,7 +283,7 @@ func (s *httpSubscriber) Subscriber() interface{} {
return s.subscriber
}
func (s *httpSubscriber) Endpoints() []*register.Endpoint {
func (s *httpSubscriber) Endpoints() []*registry.Endpoint {
return s.endpoints
}

55
util.go
View File

@ -1,55 +0,0 @@
package http
import (
"context"
"net/http"
"strings"
"go.unistack.org/micro/v3/metadata"
rutil "go.unistack.org/micro/v3/util/reflect"
)
func FillRequest(ctx context.Context, req interface{}, opts ...FillRequestOption) error {
var err error
options := handlerOptions{}
for _, o := range opts {
o(&options)
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil
}
for idx := 0; idx < len(options.headers)/2; idx += 2 {
k := http.CanonicalHeaderKey(options.headers[idx])
v, ok := md[k]
if !ok {
continue
}
if err = rutil.SetFieldByPath(req, v, k); err != nil {
return err
}
}
cookies := strings.Split(md["Cookie"], ";")
cmd := make(map[string]string, len(cookies))
for _, cookie := range cookies {
kv := strings.Split(cookie, "=")
if len(kv) != 2 {
continue
}
cmd[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1])
}
for idx := 0; idx < len(options.cookies)/2; idx += 2 {
k := http.CanonicalHeaderKey(options.cookies[idx])
v, ok := cmd[k]
if !ok {
continue
}
if err = rutil.SetFieldByPath(req, v, k); err != nil {
return err
}
}
return nil
}

View File

@ -1,100 +0,0 @@
package http
import (
"bytes"
"context"
"net/http"
"strings"
"testing"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/server"
)
func Test_Hook(t *testing.T) {
opts := server.Options{}
var fn server.HandlerFunc = func(fctx context.Context, req server.Request, rsp interface{}) (err error) {
// fmt.Println("1")
return nil
}
var fn2 server.HandlerWrapper = func(next server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
// fmt.Println("2")
return next(ctx, req, rsp)
}
}
var fn3 server.HandlerWrapper = func(next server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
// fmt.Println("3")
return next(ctx, req, rsp)
}
}
var fn4 server.HandlerWrapper = func(next server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
// fmt.Println("4")
return next(ctx, req, rsp)
}
}
opts.Hooks = append(opts.Hooks, fn2, fn3, fn4)
opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(server.HandlerWrapper); ok {
// fmt.Printf("h %#+v\n", h)
fn = h(fn)
}
})
err := fn(nil, nil, nil)
if err != nil {
t.Fatal(err)
}
}
func TestFillrequest(t *testing.T) {
md := metadata.New(1)
md.Set("ClientID", "xxx")
type request struct {
Token string
ClientID string
}
ctx := context.Background()
hreq, _ := http.NewRequestWithContext(ctx, http.MethodGet, "/v1", nil)
cookie1 := &http.Cookie{Name: "Token", Value: "zzz"}
cookie2 := &http.Cookie{Name: "Token", Value: "zzz"}
hreq.AddCookie(cookie1)
hreq.AddCookie(cookie2)
buf := bytes.NewBuffer(nil)
_ = hreq.Write(buf)
var cookie string
var line string
var err error
for {
line, err = buf.ReadString('\n')
if err != nil {
break
}
if strings.Contains(line, "Cookie") {
cookie = strings.TrimSpace(strings.Split(line, ":")[1])
break
}
}
md.Set("Cookie", cookie)
ctx = metadata.NewIncomingContext(ctx, md)
req := &request{}
if err := FillRequest(ctx, req, Cookie("Token", "true"), Header("ClientID", "true")); err != nil {
t.Fatal(err)
}
if req.ClientID != "xxx" {
t.Fatalf("FillRequest error: %#+v", req)
}
if req.Token != "zzz" {
t.Fatalf("FillRequest error: %#+v", req)
}
}