Compare commits

..

No commits in common. "master" and "v3.3.1" have entirely different histories.

21 changed files with 469 additions and 1005 deletions

View File

@ -1,19 +0,0 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
version: 2
updates:
# Maintain dependencies for GitHub Actions
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
# Maintain dependencies for Golang
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "daily"

20
.github/renovate.json vendored Normal file
View File

@ -0,0 +1,20 @@
{
"extends": [
"config:base"
],
"postUpdateOptions": ["gomodTidy"],
"packageRules": [
{
"matchUpdateTypes": ["minor", "patch", "pin", "digest"],
"automerge": true
},
{
"groupName": "all deps",
"separateMajorMinor": true,
"groupSlug": "all",
"packagePatterns": [
"*"
]
}
]
}

13
.github/stale.sh vendored Executable file
View File

@ -0,0 +1,13 @@
#!/bin/bash -ex
export PATH=$PATH:$(pwd)/bin
export GO111MODULE=on
export GOBIN=$(pwd)/bin
#go get github.com/rvflash/goup@v0.4.1
#goup -v ./...
#go get github.com/psampaz/go-mod-outdated@v0.6.0
go list -u -m -mod=mod -json all | go-mod-outdated -update -direct -ci || true
#go list -u -m -json all | go-mod-outdated -update

View File

@ -1,20 +0,0 @@
name: "autoapprove"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
autoapprove:
runs-on: ubuntu-latest
steps:
- name: approve
uses: hmarr/auto-approve-action@v3
if: github.actor == 'vtolstov' || github.actor == 'dependabot[bot]'
id: approve
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -1,21 +0,0 @@
name: "automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'vtolstov'
steps:
- name: merge
id: merge
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

View File

@ -3,20 +3,19 @@ on:
push:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: setup
uses: actions/setup-go@v3
uses: actions/setup-go@v2
with:
go-version: 1.17
go-version: 1.16
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v2
- name: cache
uses: actions/cache@v3
uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
@ -32,9 +31,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v2
- name: lint
uses: golangci/golangci-lint-action@v3.4.0
uses: golangci/golangci-lint-action@v2
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.

View File

@ -1,78 +0,0 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "codeql"
on:
workflow_run:
workflows: ["prbuild"]
types:
- completed
push:
branches: [ master, v3 ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master, v3 ]
schedule:
- cron: '34 1 * * 0'
jobs:
analyze:
name: analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: checkout
uses: actions/checkout@v3
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
# Initializes the CodeQL tools for scanning.
- name: init
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: autobuild
uses: github/codeql-action/autobuild@v2
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: analyze
uses: github/codeql-action/analyze@v2

View File

@ -1,27 +0,0 @@
name: "dependabot-automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'dependabot[bot]'
steps:
- name: metadata
id: metadata
uses: dependabot/fetch-metadata@v1.3.6
with:
github-token: "${{ secrets.TOKEN }}"
- name: merge
id: merge
if: ${{contains(steps.metadata.outputs.dependency-names, 'go.unistack.org')}}
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

View File

@ -3,20 +3,19 @@ on:
pull_request:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: setup
uses: actions/setup-go@v3
uses: actions/setup-go@v2
with:
go-version: 1.17
go-version: 1.16
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v2
- name: cache
uses: actions/cache@v3
uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
@ -32,9 +31,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v2
- name: lint
uses: golangci/golangci-lint-action@v3.4.0
uses: golangci/golangci-lint-action@v2
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.

View File

@ -1,44 +0,0 @@
run:
concurrency: 4
deadline: 5m
issues-exit-code: 1
tests: true
linters-settings:
govet:
check-shadowing: true
enable:
- fieldalignment
linters:
enable:
- govet
- deadcode
- errcheck
- govet
- ineffassign
- staticcheck
- structcheck
- typecheck
- unused
- varcheck
- bodyclose
- gci
- goconst
- gocritic
- gosimple
- gofmt
- gofumpt
- goimports
- golint
- gosec
- makezero
- misspell
- nakedret
- nestif
- nilerr
- noctx
- prealloc
- unconvert
- unparam
disable-all: false

View File

@ -5,14 +5,14 @@ This plugin is a http client for micro.
## Overview
The http client wraps `net/http` to provide a robust micro client with service discovery, load balancing and streaming.
It complies with the [micro.Client](https://godoc.org/go.unistack.org/micro-client-http/v4#Client) interface.
It complies with the [micro.Client](https://godoc.org/github.com/unistack-org/micro-client-http#Client) interface.
## Usage
### Use directly
```go
import "go.unistack.org/micro-client-http/v4"
import "github.com/unistack-org/micro-client-http"
service := micro.NewService(
micro.Name("my.service"),

6
go.mod
View File

@ -1,5 +1,5 @@
module go.unistack.org/micro-client-http/v4
module github.com/unistack-org/micro-client-http/v3
go 1.19
go 1.16
require go.unistack.org/micro/v4 v4.0.18
require github.com/unistack-org/micro/v3 v3.3.3

23
go.sum
View File

@ -1,6 +1,17 @@
go.unistack.org/micro/v4 v4.0.1 h1:xo1IxbVfgh8i0eY0VeYa3cbb13u5n/Mxnp3FOgWD4Jo=
go.unistack.org/micro/v4 v4.0.1/go.mod h1:p/J5UcSJjfHsWGT31uKoghQ5rUQZzQJBAFy+Z4+ZVMs=
go.unistack.org/micro/v4 v4.0.6 h1:YFWvTh3VwyOd6NHYTQcf47n2TF5+p/EhpnbuBQX3qhk=
go.unistack.org/micro/v4 v4.0.6/go.mod h1:bVEYTlPi0EsdgZZt311bIroDg9ict7ky3C87dSCCAGk=
go.unistack.org/micro/v4 v4.0.18 h1:b7WFwem8Nz1xBrRg5FeLnm9CE5gJseHyf9j0BhkiXW0=
go.unistack.org/micro/v4 v4.0.18/go.mod h1:5+da5r835gP0WnNZbYUJDCvWpJ9Xc3IEGyp62e8o8R4=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/unistack-org/micro/v3 v3.3.3 h1:Igkzl8tWPlIacEK9z8hHVIzhdyzi8drQPt0Am2iHAcA=
github.com/unistack-org/micro/v3 v3.3.3/go.mod h1:tX95c0Qx4w6oqU7qKThs9lya9P507BdZ29MsTVDmU6w=
golang.org/x/net v0.0.0-20210324205630-d1beb07c2056/go.mod h1:uSPa2vr4CLtc/ILN5odXGNXS6mhrKVzTaCXzk9m6W3k=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

464
http.go
View File

@ -1,5 +1,5 @@
// Package http provides a http client
package http // import "go.unistack.org/micro-client-http/v4"
package http
import (
"bufio"
@ -10,61 +10,56 @@ import (
"net"
"net/http"
"net/url"
"strings"
"sync"
"os"
"time"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/options"
"go.unistack.org/micro/v4/selector"
rutil "go.unistack.org/micro/v4/util/reflect"
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/errors"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/router"
)
var DefaultContentType = "application/json"
var (
DefaultContentType = "application/json"
)
/*
func filterLabel(r []router.Route) []router.Route {
// selector.FilterLabel("protocol", "http")
return r
}
*/
type httpClient struct {
httpcli *http.Client
opts client.Options
sync.RWMutex
dialer *net.Dialer
httpcli *http.Client
init bool
}
func newRequest(ctx context.Context, log logger.Logger, addr string, req client.Request, ct string, cf codec.Codec, msg interface{}, opts client.CallOptions) (*http.Request, error) {
var tags []string
var parameters map[string]map[string]string
scheme := "http"
method := http.MethodPost
func newRequest(addr string, req client.Request, ct string, cf codec.Codec, msg interface{}, opts client.CallOptions) (*http.Request, error) {
hreq := &http.Request{Method: http.MethodPost}
body := "*" // as like google api http annotation
host := addr
path := req.Endpoint()
var tags []string
var scheme string
u, err := url.Parse(addr)
if err == nil {
scheme = u.Scheme
path = u.Path
host = u.Host
} else {
u = &url.URL{Scheme: scheme, Path: path, Host: host}
if err != nil {
hreq.URL = &url.URL{
Scheme: "http",
Host: addr,
Path: req.Endpoint(),
}
// nolint: nestif
hreq.Host = addr
scheme = "http"
} else {
ep := req.Endpoint()
if opts.Context != nil {
if m, ok := opts.Context.Value(methodKey{}).(string); ok {
method = m
hreq.Method = m
}
if p, ok := opts.Context.Value(pathKey{}).(string); ok {
path += p
ep = p
}
if b, ok := opts.Context.Value(bodyKey{}).(string); ok {
body = b
@ -72,31 +67,11 @@ func newRequest(ctx context.Context, log logger.Logger, addr string, req client.
if t, ok := opts.Context.Value(structTagsKey{}).([]string); ok && len(t) > 0 {
tags = t
}
if k, ok := opts.Context.Value(headerKey{}).([]string); ok && len(k) > 0 {
if parameters == nil {
parameters = make(map[string]map[string]string)
}
m, ok := parameters["header"]
if !ok {
m = make(map[string]string)
parameters["header"] = m
}
for idx := 0; idx < len(k)/2; idx += 2 {
m[k[idx]] = k[idx+1]
}
}
if k, ok := opts.Context.Value(cookieKey{}).([]string); ok && len(k) > 0 {
if parameters == nil {
parameters = make(map[string]map[string]string)
}
m, ok := parameters["cookie"]
if !ok {
m = make(map[string]string)
parameters["cookie"] = m
}
for idx := 0; idx < len(k)/2; idx += 2 {
m[k[idx]] = k[idx+1]
}
hreq.URL, err = u.Parse(ep)
if err != nil {
return nil, errors.BadRequest("go.micro.client", err.Error())
}
}
@ -109,167 +84,112 @@ func newRequest(ctx context.Context, log logger.Logger, addr string, req client.
}
}
if path == "" {
path = req.Endpoint()
}
u, err = u.Parse(path)
path, nmsg, err := newPathRequest(hreq.URL.Path, hreq.Method, body, msg, tags)
if err != nil {
return nil, errors.BadRequest("go.micro.client", err.Error())
}
var nmsg interface{}
if len(u.Query()) > 0 {
path, nmsg, err = newPathRequest(u.Path+"?"+u.RawQuery, method, body, msg, tags, parameters)
if scheme != "" {
hreq.URL, err = url.Parse(scheme + "://" + addr + path)
} else {
path, nmsg, err = newPathRequest(u.Path, method, body, msg, tags, parameters)
hreq.URL, err = url.Parse(addr + path)
}
if err != nil {
return nil, errors.BadRequest("go.micro.client", err.Error())
}
u, err = url.Parse(fmt.Sprintf("%s://%s%s", scheme, host, path))
// marshal request is struct not empty
if nmsg != nil {
var b []byte
b, err = cf.Marshal(nmsg)
if err != nil {
return nil, errors.BadRequest("go.micro.client", err.Error())
}
var cookies []*http.Cookie
header := make(http.Header)
if opts.Context != nil {
if md, ok := opts.Context.Value(metadataKey{}).(metadata.Metadata); ok {
for k, v := range md {
header[k] = v
}
}
}
if opts.AuthToken != "" {
header.Set(metadata.HeaderAuthorization, opts.AuthToken)
}
if opts.RequestMetadata != nil {
for k, v := range opts.RequestMetadata {
header[k] = v
}
}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
for k, v := range md {
header[k] = v
}
}
// set timeout in nanoseconds
if opts.StreamTimeout > time.Duration(0) {
header.Set(metadata.HeaderTimeout, fmt.Sprintf("%d", opts.StreamTimeout))
}
if opts.RequestTimeout > time.Duration(0) {
header.Set(metadata.HeaderTimeout, fmt.Sprintf("%d", opts.RequestTimeout))
}
// set the content type for the request
header.Set(metadata.HeaderContentType, ct)
var v interface{}
for km, vm := range parameters {
for k, required := range vm {
v, err = rutil.StructFieldByPath(msg, k)
if err != nil {
return nil, errors.BadRequest("go.micro.client", err.Error())
}
if rutil.IsZero(v) {
if required == "true" {
return nil, errors.BadRequest("go.micro.client", fmt.Sprintf("required field %s not set", k))
}
continue
}
switch km {
case "header":
header.Set(k, fmt.Sprintf("%v", v))
case "cookie":
cookies = append(cookies, &http.Cookie{Name: k, Value: fmt.Sprintf("%v", v)})
}
}
}
b, err := cf.Marshal(nmsg)
if err != nil {
return nil, errors.BadRequest("go.micro.client", err.Error())
}
var hreq *http.Request
if len(b) > 0 {
hreq, err = http.NewRequestWithContext(ctx, method, u.String(), ioutil.NopCloser(bytes.NewBuffer(b)))
hreq.Body = ioutil.NopCloser(bytes.NewBuffer(b))
hreq.ContentLength = int64(len(b))
header.Set("Content-Length", fmt.Sprintf("%d", hreq.ContentLength))
} else {
hreq, err = http.NewRequestWithContext(ctx, method, u.String(), nil)
}
if err != nil {
return nil, errors.BadRequest("go.micro.client", err.Error())
}
hreq.Header = header
for _, cookie := range cookies {
hreq.AddCookie(cookie)
}
if log.V(logger.DebugLevel) {
log.Debug(ctx, fmt.Sprintf("request %s to %s with headers %v body %s", method, u.String(), hreq.Header, b))
}
return hreq, nil
}
func (h *httpClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
ct := req.ContentType()
if len(opts.ContentType) > 0 {
ct = opts.ContentType
header := make(http.Header, 2)
if md, ok := metadata.FromContext(ctx); ok {
for k, v := range md {
header.Set(k, v)
}
}
ct := req.ContentType()
// set timeout in nanoseconds
header.Set("Timeout", fmt.Sprintf("%d", opts.RequestTimeout))
// set the content type for the request
header.Set("Content-Type", ct)
// get codec
cf, err := h.newCodec(ct)
if err != nil {
return errors.BadRequest("go.micro.client", err.Error())
return errors.InternalServerError("go.micro.client", err.Error())
}
hreq, err := newRequest(ctx, h.opts.Logger, addr, req, ct, cf, req.Body(), opts)
hreq, err := newRequest(addr, req, ct, cf, req.Body(), opts)
if err != nil {
return err
}
hreq.Header = header
// make the request
hrsp, err := h.httpcli.Do(hreq)
hrsp, err := h.httpcli.Do(hreq.WithContext(ctx))
if err != nil {
switch err := err.(type) {
case *url.Error:
if err, ok := err.Err.(net.Error); ok && err.Timeout() {
return errors.Timeout("go.micro.client", err.Error())
}
case net.Error:
if err.Timeout() {
return errors.Timeout("go.micro.client", err.Error())
}
case *url.Error:
if err, ok := err.Err.(net.Error); ok && err.Timeout() {
return errors.Timeout("go.micro.client", err.Error())
}
}
return errors.InternalServerError("go.micro.client", err.Error())
}
defer hrsp.Body.Close()
return h.parseRsp(ctx, hrsp, rsp, opts)
return parseRsp(ctx, hrsp, cf, rsp, opts)
}
func (h *httpClient) stream(ctx context.Context, addr string, req client.Request, opts client.CallOptions) (client.Stream, error) {
ct := req.ContentType()
if len(opts.ContentType) > 0 {
ct = opts.ContentType
var header http.Header
if md, ok := metadata.FromContext(ctx); ok {
header = make(http.Header, len(md)+2)
for k, v := range md {
header.Set(k, v)
}
} else {
header = make(http.Header, 2)
}
ct := req.ContentType()
// set timeout in nanoseconds
header.Set("Timeout", fmt.Sprintf("%d", opts.RequestTimeout))
// set the content type for the request
header.Set("Content-Type", ct)
// get codec
cf, err := h.newCodec(ct)
cf, err := h.newCodec(req.ContentType())
if err != nil {
return nil, errors.BadRequest("go.micro.client", err.Error())
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
dialAddr := addr
u, err := url.Parse(dialAddr)
if err == nil && u.Scheme != "" && u.Host != "" {
dialAddr = u.Host
}
cc, err := (h.httpcli.Transport).(*http.Transport).DialContext(ctx, "tcp", addr)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error dialing: %v", err))
@ -277,26 +197,19 @@ func (h *httpClient) stream(ctx context.Context, addr string, req client.Request
return &httpStream{
address: addr,
logger: h.opts.Logger,
context: ctx,
closed: make(chan bool),
opts: opts,
conn: cc,
ct: ct,
cf: cf,
header: header,
reader: bufio.NewReader(cc),
request: req,
}, nil
}
func (h *httpClient) newCodec(ct string) (codec.Codec, error) {
h.RLock()
defer h.RUnlock()
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx]
}
if c, ok := h.opts.Codecs[ct]; ok {
return c, nil
}
@ -304,7 +217,7 @@ func (h *httpClient) newCodec(ct string) (codec.Codec, error) {
return nil, codec.ErrUnknownContentType
}
func (h *httpClient) Init(opts ...options.Option) error {
func (h *httpClient) Init(opts ...client.Option) error {
if len(opts) == 0 && h.init {
return nil
}
@ -312,6 +225,9 @@ func (h *httpClient) Init(opts ...options.Option) error {
o(&h.opts)
}
if err := h.opts.Broker.Init(); err != nil {
return err
}
if err := h.opts.Tracer.Init(); err != nil {
return err
}
@ -324,6 +240,9 @@ func (h *httpClient) Init(opts ...options.Option) error {
if err := h.opts.Meter.Init(); err != nil {
return err
}
if err := h.opts.Transport.Init(); err != nil {
return err
}
return nil
}
@ -332,11 +251,15 @@ func (h *httpClient) Options() client.Options {
return h.opts
}
func (h *httpClient) NewRequest(service, method string, req interface{}, opts ...options.Option) client.Request {
return newHTTPRequest(service, method, req, h.opts.ContentType, opts...)
func (h *httpClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
return newHTTPMessage(topic, msg, h.opts.ContentType, opts...)
}
func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...options.Option) error {
func (h *httpClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return newHTTPRequest(service, method, req, h.opts.ContentType, reqOpts...)
}
func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
// make a copy of call opts
callOpts := h.opts.CallOptions
for _, opt := range opts {
@ -353,9 +276,8 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
} else {
// got a deadline so no need to setup context
// but we need to set the timeout we pass along
if err := options.Set(&callOpts, time.Until(d), ".RequestTimeout"); err != nil {
return errors.New("go.micro.client", fmt.Sprintf("%v", err.Error()), 400)
}
opt := client.WithRequestTimeout(d.Sub(time.Now()))
opt(&callOpts)
}
// should we noop right here?
@ -369,9 +291,9 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
hcall := h.call
// wrap the call in reverse
//for i := len(callOpts.CallWrappers); i > 0; i-- {
// hcall = callOpts.CallWrappers[i-1](hcall)
//}
for i := len(callOpts.CallWrappers); i > 0; i-- {
hcall = callOpts.CallWrappers[i-1](hcall)
}
// use the router passed as a call option, or fallback to the rpc clients router
if callOpts.Router == nil {
@ -388,7 +310,18 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
callOpts.Address = []string{h.opts.Proxy}
}
var next selector.Next
// lookup the route to send the reques to
// TODO apply any filtering here
routes, err := h.opts.Lookup(ctx, req, callOpts)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// balance the list of nodes
next, err := callOpts.Selector.Select(routes)
if err != nil {
return err
}
// return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error {
@ -403,22 +336,6 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
time.Sleep(t)
}
if next == nil {
var routes []string
// lookup the route to send the reques to
// TODO apply any filtering here
routes, err = h.opts.Lookup(ctx, req, callOpts)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// balance the list of nodes
next, err = callOpts.Selector.Select(routes)
if err != nil {
return err
}
}
node := next()
// make the call
@ -469,28 +386,25 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
return gerr
}
func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...options.Option) (client.Stream, error) {
var err error
func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
// make a copy of call opts
callOpts := h.opts.CallOptions
for _, o := range opts {
o(&callOpts)
for _, opt := range opts {
opt(&callOpts)
}
// check if we already have a deadline
d, ok := ctx.Deadline()
if !ok && callOpts.StreamTimeout > time.Duration(0) {
if !ok {
var cancel context.CancelFunc
// no deadline so we create a new one
ctx, cancel = context.WithTimeout(ctx, callOpts.StreamTimeout)
ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout)
defer cancel()
} else {
// got a deadline so no need to setup context
// but we need to set the timeout we pass along
if err = options.Set(&callOpts, time.Until(d), ".StreamTimeout"); err != nil {
return nil, errors.New("go.micro.client", fmt.Sprintf("%v", err.Error()), 400)
}
opt := client.WithRequestTimeout(d.Sub(time.Now()))
opt(&callOpts)
}
// should we noop right here?
@ -502,7 +416,10 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...opt
/*
// make copy of call method
hstream := h.stream
hstream, err := h.stream()
if err != nil {
return nil, err
}
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
hstream = callOpts.CallWrappers[i-1](hstream)
@ -524,13 +441,24 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...opt
callOpts.Address = []string{h.opts.Proxy}
}
var next selector.Next
// lookup the route to send the reques to
// TODO apply any filtering here
routes, err := h.opts.Lookup(ctx, req, callOpts)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
// balance the list of nodes
next, err := callOpts.Selector.Select(routes)
if err != nil {
return nil, err
}
call := func(i int) (client.Stream, error) {
// call backoff first. Someone may want an initial start delay
t, cerr := callOpts.Backoff(ctx, req, i)
if cerr != nil {
return nil, errors.InternalServerError("go.micro.client", cerr.Error())
t, err := callOpts.Backoff(ctx, req, i)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
// only sleep if greater than 0
@ -538,37 +466,21 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...opt
time.Sleep(t)
}
if next == nil {
var routes []string
// lookup the route to send the reques to
// TODO apply any filtering here
routes, err = h.opts.Lookup(ctx, req, callOpts)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
// balance the list of nodes
next, err = callOpts.Selector.Select(routes)
if err != nil {
return nil, err
}
}
node := next()
stream, cerr := h.stream(ctx, node, req, callOpts)
stream, err := h.stream(ctx, node, req, callOpts)
// record the result of the call to inform future routing decisions
if verr := h.opts.Selector.Record(node, cerr); verr != nil {
if verr := h.opts.Selector.Record(node, err); verr != nil {
return nil, verr
}
// try and transform the error to a go-micro error
if verr, ok := cerr.(*errors.Error); ok {
if verr, ok := err.(*errors.Error); ok {
return nil, verr
}
return stream, cerr
return stream, err
}
type response struct {
@ -581,8 +493,8 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...opt
for i := 0; i <= callOpts.Retries; i++ {
go func() {
s, cerr := call(i)
ch <- response{s, cerr}
s, err := call(i)
ch <- response{s, err}
}()
select {
@ -610,6 +522,52 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...opt
return nil, grr
}
func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
options := client.NewPublishOptions(opts...)
md, ok := metadata.FromContext(ctx)
if !ok {
md = metadata.New(2)
}
md["Content-Type"] = p.ContentType()
md["Micro-Topic"] = p.Topic()
cf, err := h.newCodec(p.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
var body []byte
// passed in raw data
if d, ok := p.Payload().(*codec.Frame); ok {
body = d.Data
} else {
b := bytes.NewBuffer(nil)
if err := cf.Write(b, &codec.Message{Type: codec.Event}, p.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
body = b.Bytes()
}
topic := p.Topic()
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
options.Exchange = prx
}
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}
return h.opts.Broker.Publish(ctx, topic, &broker.Message{
Header: md,
Body: body,
}, broker.PublishContext(ctx))
}
func (h *httpClient) String() string {
return "http"
}
@ -618,7 +576,7 @@ func (h *httpClient) Name() string {
return h.opts.Name
}
func NewClient(opts ...options.Option) client.Client {
func NewClient(opts ...client.Option) client.Client {
options := client.NewOptions(opts...)
if len(options.ContentType) == 0 {
@ -629,33 +587,20 @@ func NewClient(opts ...options.Option) client.Client {
opts: options,
}
var dialer func(context.Context, string) (net.Conn, error)
if v, ok := options.Context.Value(httpDialerKey{}).(*net.Dialer); ok {
dialer = func(ctx context.Context, addr string) (net.Conn, error) {
return v.DialContext(ctx, "tcp", addr)
}
}
if options.ContextDialer != nil {
dialer = options.ContextDialer
}
if dialer == nil {
dialer = func(ctx context.Context, addr string) (net.Conn, error) {
return (&net.Dialer{
dialer, ok := options.Context.Value(httpDialerKey{}).(*net.Dialer)
if !ok {
dialer = &net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext(ctx, "tcp", addr)
}
}
if httpcli, ok := options.Context.Value(httpClientKey{}).(*http.Client); ok {
rc.httpcli = httpcli
} else {
// TODO customTransport := http.DefaultTransport.(*http.Transport).Clone()
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return dialer(ctx, addr)
},
DialContext: dialer.DialContext,
ForceAttemptHTTP2: true,
MaxConnsPerHost: 100,
MaxIdleConns: 20,
@ -668,5 +613,10 @@ func NewClient(opts ...options.Option) client.Client {
}
c := client.Client(rc)
// wrap in reverse
for i := len(options.Wrappers); i > 0; i-- {
c = options.Wrappers[i-1](c)
}
return c
}

View File

@ -8,56 +8,14 @@ import (
type Request struct {
Name string `json:"name"`
Field1 string `json:"field1"`
ClientID string
Field1 string
Field2 string
Field3 int64
}
func TestPathWithHeader(t *testing.T) {
req := &Request{Name: "vtolstov", Field1: "field1", ClientID: "1234567890"}
p, m, err := newPathRequest(
"/api/v1/test?Name={name}&Field1={field1}",
"POST",
"*",
req,
nil,
map[string]map[string]string{"header": {"ClientID": "true"}},
)
if err != nil {
t.Fatal(err)
}
u, err := url.Parse(p)
if err != nil {
t.Fatal(err)
}
if m != nil {
t.Fatal("new struct must be nil")
}
if u.Query().Get("Name") != "vtolstov" || u.Query().Get("Field1") != "field1" {
t.Fatalf("invalid values %v", u.Query())
}
}
func TestPathValues(t *testing.T) {
req := &Request{Name: "vtolstov", Field1: "field1"}
p, m, err := newPathRequest("/api/v1/test?Name={name}&Field1={field1}", "POST", "*", req, nil, nil)
if err != nil {
t.Fatal(err)
}
u, err := url.Parse(p)
if err != nil {
t.Fatal(err)
}
_ = m
if u.Query().Get("Name") != "vtolstov" || u.Query().Get("Field1") != "field1" {
t.Fatalf("invalid values %v", u.Query())
}
}
func TestValidPath(t *testing.T) {
req := &Request{Name: "vtolstov", Field1: "field1", Field2: "field2", Field3: 10}
p, m, err := newPathRequest("/api/v1/{name}/list", "GET", "", req, nil, nil)
p, m, err := newPathRequest("/api/v1/{name}/list", "GET", "", req, nil)
if err != nil {
t.Fatal(err)
}
@ -74,8 +32,9 @@ func TestValidPath(t *testing.T) {
func TestInvalidPath(t *testing.T) {
req := &Request{Name: "vtolstov", Field1: "field1", Field2: "field2", Field3: 10}
_, _, err := newPathRequest("/api/v1/{xname}/list", "GET", "", req, nil, nil)
p, m, err := newPathRequest("/api/v1/{xname}/list", "GET", "", req, nil)
if err == nil {
t.Fatal("path param must not be filled")
t.Fatalf("path param must not be filled")
}
_, _ = p, m
}

36
message.go Normal file
View File

@ -0,0 +1,36 @@
package http
import (
"github.com/unistack-org/micro/v3/client"
)
type httpMessage struct {
topic string
contentType string
payload interface{}
}
func newHTTPMessage(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message {
options := client.NewMessageOptions(opts...)
if len(options.ContentType) == 0 {
options.ContentType = contentType
}
return &httpMessage{
payload: payload,
topic: topic,
contentType: options.ContentType,
}
}
func (h *httpMessage) ContentType() string {
return h.contentType
}
func (h *httpMessage) Topic() string {
return h.topic
}
func (h *httpMessage) Payload() interface{} {
return h.payload
}

View File

@ -4,8 +4,7 @@ import (
"net"
"net/http"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/options"
"github.com/unistack-org/micro/v3/client"
)
var (
@ -27,100 +26,70 @@ var (
)
type poolMaxStreams struct{}
// PoolMaxStreams maximum streams on a connectioin
func PoolMaxStreams(n int) options.Option {
return options.ContextOption(poolMaxStreams{}, n)
}
type poolMaxIdle struct{}
// PoolMaxIdle maximum idle conns of a pool
func PoolMaxIdle(d int) options.Option {
return options.ContextOption(poolMaxIdle{}, d)
}
type codecsKey struct{}
type tlsAuth struct{}
type maxRecvMsgSizeKey struct{}
// MaxRecvMsgSize set the maximum size of message that client can receive.
func MaxRecvMsgSize(s int) options.Option {
return options.ContextOption(maxRecvMsgSizeKey{}, s)
}
type maxSendMsgSizeKey struct{}
// PoolMaxStreams maximum streams on a connectioin
func PoolMaxStreams(n int) client.Option {
return client.SetOption(poolMaxStreams{}, n)
}
// PoolMaxIdle maximum idle conns of a pool
func PoolMaxIdle(d int) client.Option {
return client.SetOption(poolMaxIdle{}, d)
}
// MaxRecvMsgSize set the maximum size of message that client can receive.
func MaxRecvMsgSize(s int) client.Option {
return client.SetOption(maxRecvMsgSizeKey{}, s)
}
// MaxSendMsgSize set the maximum size of message that client can send.
func MaxSendMsgSize(s int) options.Option {
return options.ContextOption(maxSendMsgSizeKey{}, s)
func MaxSendMsgSize(s int) client.Option {
return client.SetOption(maxSendMsgSizeKey{}, s)
}
type httpClientKey struct{}
// nolint: golint
// HTTPClient pass http.Client option to client Call
func HTTPClient(c *http.Client) options.Option {
return options.ContextOption(httpClientKey{}, c)
func HTTPClient(c *http.Client) client.Option {
return client.SetOption(httpClientKey{}, c)
}
type httpDialerKey struct{}
// nolint: golint
// HTTPDialer pass net.Dialer option to client
func HTTPDialer(d *net.Dialer) options.Option {
return options.ContextOption(httpDialerKey{}, d)
func HTTPDialer(d *net.Dialer) client.Option {
return client.SetOption(httpDialerKey{}, d)
}
type methodKey struct{}
// Method pass method option to client Call
func Method(m string) options.Option {
return options.ContextOption(methodKey{}, m)
func Method(m string) client.CallOption {
return client.SetCallOption(methodKey{}, m)
}
type pathKey struct{}
// Path spcecifies path option to client Call
func Path(p string) options.Option {
return options.ContextOption(pathKey{}, p)
func Path(p string) client.CallOption {
return client.SetCallOption(pathKey{}, p)
}
type bodyKey struct{}
// Body specifies body option to client Call
func Body(b string) options.Option {
return options.ContextOption(bodyKey{}, b)
func Body(b string) client.CallOption {
return client.SetCallOption(bodyKey{}, b)
}
type errorMapKey struct{}
func ErrorMap(m map[string]interface{}) options.Option {
return options.ContextOption(errorMapKey{}, m)
func ErrorMap(m map[string]interface{}) client.CallOption {
return client.SetCallOption(errorMapKey{}, m)
}
type structTagsKey struct{}
// StructTags pass tags slice option to client Call
func StructTags(tags []string) options.Option {
return options.ContextOption(structTagsKey{}, tags)
}
type metadataKey struct{}
// Metadata pass metadata to client Call
func Metadata(md metadata.Metadata) options.Option {
return options.ContextOption(metadataKey{}, md)
}
type cookieKey struct{}
// Cookie pass cookie to client Call
func Cookie(cookies ...string) options.Option {
return options.ContextOption(cookieKey{}, cookies)
}
type headerKey struct{}
// Header pass cookie to client Call
func Header(headers ...string) options.Option {
return options.ContextOption(headerKey{}, headers)
func StructTags(tags []string) client.CallOption {
return client.SetCallOption(structTagsKey{}, tags)
}

View File

@ -1,9 +1,8 @@
package http
import (
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/options"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/codec"
)
type httpRequest struct {
@ -14,8 +13,9 @@ type httpRequest struct {
opts client.RequestOptions
}
func newHTTPRequest(service, method string, request interface{}, contentType string, opts ...options.Option) client.Request {
func newHTTPRequest(service, method string, request interface{}, contentType string, opts ...client.RequestOption) client.Request {
options := client.NewRequestOptions(opts...)
if len(options.ContentType) == 0 {
options.ContentType = contentType
}

104
stream.go
View File

@ -4,34 +4,35 @@ import (
"bufio"
"context"
"fmt"
"io"
"net"
"net/http"
"sync"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/logger"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/errors"
)
// Implements the streamer interface
type httpStream struct {
err error
conn net.Conn
sync.RWMutex
address string
opts client.CallOptions
ct string
cf codec.Codec
context context.Context
logger logger.Logger
request client.Request
header http.Header
seq uint64
closed chan bool
err error
conn net.Conn
reader *bufio.Reader
address string
ct string
opts client.CallOptions
sync.RWMutex
request client.Request
}
var errShutdown = fmt.Errorf("connection is shut down")
var (
errShutdown = fmt.Errorf("connection is shut down")
)
func (h *httpStream) isClosed() bool {
select {
@ -54,10 +55,6 @@ func (h *httpStream) Response() client.Response {
return nil
}
func (h *httpStream) SendMsg(msg interface{}) error {
return h.Send(msg)
}
func (h *httpStream) Send(msg interface{}) error {
h.Lock()
defer h.Unlock()
@ -67,16 +64,14 @@ func (h *httpStream) Send(msg interface{}) error {
return errShutdown
}
hreq, err := newRequest(h.context, h.logger, h.address, h.request, h.ct, h.cf, msg, h.opts)
hreq, err := newRequest(h.address, h.request, h.ct, h.cf, msg, h.opts)
if err != nil {
return err
}
return hreq.Write(h.conn)
}
hreq.Header = h.header
func (h *httpStream) RecvMsg(msg interface{}) error {
return h.Recv(msg)
return hreq.Write(h.conn)
}
func (h *httpStream) Recv(msg interface{}) error {
@ -94,7 +89,7 @@ func (h *httpStream) Recv(msg interface{}) error {
}
defer hrsp.Body.Close()
return h.parseRsp(h.context, h.logger, hrsp, h.cf, msg, h.opts)
return parseRsp(h.context, hrsp, h.cf, msg, h.opts)
}
func (h *httpStream) Error() error {
@ -103,10 +98,6 @@ func (h *httpStream) Error() error {
return h.err
}
func (h *httpStream) CloseSend() error {
return h.Close()
}
func (h *httpStream) Close() error {
select {
case <-h.closed:
@ -116,60 +107,3 @@ func (h *httpStream) Close() error {
return h.conn.Close()
}
}
func (h *httpStream) parseRsp(ctx context.Context, log logger.Logger, hrsp *http.Response, cf codec.Codec, rsp interface{}, opts client.CallOptions) error {
var err error
var buf []byte
// fast path return
if hrsp.StatusCode == http.StatusNoContent {
return nil
}
select {
case <-ctx.Done():
err = ctx.Err()
default:
if hrsp.Body != nil {
buf, err = io.ReadAll(hrsp.Body)
if err != nil {
if log.V(logger.ErrorLevel) {
log.Error(ctx, "failed to read body", err)
}
return errors.InternalServerError("go.micro.client", string(buf))
}
}
if log.V(logger.DebugLevel) {
log.Debug(ctx, fmt.Sprintf("response %s with %v", buf, hrsp.Header))
}
if hrsp.StatusCode < 400 {
if err = cf.Unmarshal(buf, rsp); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
return nil
}
var rerr interface{}
errmap, ok := opts.Context.Value(errorMapKey{}).(map[string]interface{})
if ok && errmap != nil {
if rerr, ok = errmap[fmt.Sprintf("%d", hrsp.StatusCode)].(error); !ok {
rerr, ok = errmap["default"].(error)
}
}
if !ok || rerr == nil {
return errors.New("go.micro.client", string(buf), int32(hrsp.StatusCode))
}
if cerr := cf.Unmarshal(buf, rerr); cerr != nil {
return errors.InternalServerError("go.micro.client", cerr.Error())
}
if err, ok = rerr.(error); !ok {
err = &Error{rerr}
}
}
return err
}

283
util.go
View File

@ -3,70 +3,38 @@ package http
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"reflect"
"strings"
"sync"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata"
rutil "go.unistack.org/micro/v4/util/reflect"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/errors"
rutil "github.com/unistack-org/micro/v3/util/reflect"
util "github.com/unistack-org/micro/v3/util/router"
)
var (
templateCache = make(map[string][]string)
templateCache = make(map[string]util.Template)
mu sync.RWMutex
)
// Error struct holds error
type Error struct {
err interface{}
}
// Error func for error interface
func (err *Error) Error() string {
return fmt.Sprintf("%v", err.err)
}
func GetError(err error) interface{} {
if rerr, ok := err.(*Error); ok {
return rerr.err
}
return err
}
func newPathRequest(path string, method string, body string, msg interface{}, tags []string, parameters map[string]map[string]string) (string, interface{}, error) {
func newPathRequest(path string, method string, body string, msg interface{}, tags []string) (string, interface{}, error) {
// parse via https://github.com/googleapis/googleapis/blob/master/google/api/http.proto definition
tpl, err := newTemplate(path)
if err != nil {
return "", nil, err
}
if len(tpl) > 0 && msg == nil {
if len(tpl.Fields) > 0 && msg == nil {
return "", nil, fmt.Errorf("nil message but path params requested: %v", path)
}
fieldsmapskip := make(map[string]struct{})
fieldsmap := make(map[string]string, len(tpl))
for _, v := range tpl {
var vs, ve int
for i := 0; i < len(v); i++ {
switch v[i] {
case '{':
vs = i + 1
case '}':
ve = i
}
if ve != 0 {
fieldsmap[v[vs:ve]] = ""
vs = 0
ve = 0
}
}
fieldsmap := make(map[string]string, len(tpl.Fields))
for _, v := range tpl.Fields {
fieldsmap[v] = ""
}
nmsg, err := rutil.Zero(msg)
@ -85,7 +53,7 @@ func newPathRequest(path string, method string, body string, msg interface{}, ta
tnmsg = tnmsg.Elem()
}
values := url.Values{}
values := make(map[string]string)
// copy cycle
for i := 0; i < tmsg.NumField(); i++ {
val := tmsg.Field(i)
@ -93,15 +61,7 @@ func newPathRequest(path string, method string, body string, msg interface{}, ta
continue
}
fld := tmsg.Type().Field(i)
// Skip unexported fields.
if fld.PkgPath != "" {
continue
}
/* check for empty PkgPath can be replaced with new method IsExported
if !fld.IsExported() {
continue
}
*/
t := &tag{}
for _, tn := range tags {
ts, ok := fld.Tag.Lookup(tn)
@ -113,129 +73,67 @@ func newPathRequest(path string, method string, body string, msg interface{}, ta
// special
switch tn {
case "protobuf": // special
for _, p := range tp {
if idx := strings.Index(p, "name="); idx > 0 {
t = &tag{key: tn, name: p[idx:]}
}
}
t = &tag{key: tn, name: tp[3][5:], opts: append(tp[:3], tp[4:]...)}
default:
t = &tag{key: tn, name: tp[0]}
t = &tag{key: tn, name: tp[0], opts: tp[1:]}
}
if t.name != "" {
break
}
}
cname := t.name
if cname == "" {
cname = fld.Name
if t.name == "" {
// fallback to lowercase
t.name = strings.ToLower(fld.Name)
}
if _, ok := parameters["header"][cname]; ok {
continue
}
if _, ok := parameters["cookie"][cname]; ok {
continue
}
if !val.IsValid() || val.IsZero() {
continue
}
// nolint: gocritic, nestif
if _, ok := fieldsmap[t.name]; ok {
switch val.Type().Kind() {
case reflect.Slice:
for idx := 0; idx < val.Len(); idx++ {
values.Add(t.name, getParam(val.Index(idx)))
}
fieldsmapskip[t.name] = struct{}{}
default:
fieldsmap[t.name] = getParam(val)
}
fieldsmap[t.name] = fmt.Sprintf("%v", val.Interface())
} else if (body == "*" || body == t.name) && method != http.MethodGet {
if tnmsg.Field(i).CanSet() {
tnmsg.Field(i).Set(val)
}
} else {
if val.Type().Kind() == reflect.Slice {
for idx := 0; idx < val.Len(); idx++ {
values.Add(t.name, getParam(val.Index(idx)))
}
} else {
values.Add(t.name, getParam(val))
}
values[t.name] = fmt.Sprintf("%v", val.Interface())
}
}
// check not filled stuff
for k, v := range fieldsmap {
_, ok := fieldsmapskip[k]
if !ok && v == "" {
if v == "" {
return "", nil, fmt.Errorf("path param %s not filled", k)
}
}
var b strings.Builder
for _, fld := range tpl {
for _, fld := range tpl.Pool {
_, _ = b.WriteRune('/')
// nolint: nestif
var vs, ve, vf int
var pholder bool
for i := 0; i < len(fld); i++ {
switch fld[i] {
case '{':
vs = i + 1
case '}':
ve = i
}
// nolint: nestif
if vs > 0 && ve != 0 {
if vm, ok := fieldsmap[fld[vs:ve]]; ok {
if vm != "" {
_, _ = b.WriteString(fld[vf : vs-1])
_, _ = b.WriteString(vm)
vf = ve + 1
}
if v, ok := fieldsmap[fld]; ok {
_, _ = b.WriteString(v)
} else {
_, _ = b.WriteString(fld)
}
vs = 0
ve = 0
pholder = true
}
}
if !pholder {
_, _ = b.WriteString(fld)
}
}
if len(values) > 0 {
idx := 0
for k, v := range values {
if idx == 0 {
_, _ = b.WriteRune('?')
_, _ = b.WriteString(values.Encode())
} else {
_, _ = b.WriteRune('&')
}
_, _ = b.WriteString(k)
_, _ = b.WriteRune('=')
_, _ = b.WriteString(v)
idx++
}
if rutil.IsZero(nmsg) && !isEmptyStruct(nmsg) {
if rutil.IsZero(nmsg) {
return b.String(), nil, nil
}
return b.String(), nmsg, nil
}
func isEmptyStruct(v interface{}) bool {
val := reflect.ValueOf(v)
if val.Kind() == reflect.Ptr {
val = val.Elem()
}
return val.Kind() == reflect.Struct && val.NumField() == 0
}
func newTemplate(path string) ([]string, error) {
if len(path) == 0 || path[0] != '/' {
return nil, fmt.Errorf("path must starts with /")
}
func newTemplate(path string) (util.Template, error) {
mu.RLock()
tpl, ok := templateCache[path]
if ok {
@ -244,7 +142,12 @@ func newTemplate(path string) ([]string, error) {
}
mu.RUnlock()
tpl = strings.Split(path[1:], "/")
rule, err := util.Parse(path)
if err != nil {
return tpl, err
}
tpl = rule.Compile()
mu.Lock()
templateCache[path] = tpl
mu.Unlock()
@ -252,113 +155,43 @@ func newTemplate(path string) ([]string, error) {
return tpl, nil
}
func (h *httpClient) parseRsp(ctx context.Context, hrsp *http.Response, rsp interface{}, opts client.CallOptions) error {
var err error
var buf []byte
// fast path return
if hrsp.StatusCode == http.StatusNoContent {
return nil
}
if opts.ResponseMetadata != nil {
*opts.ResponseMetadata = metadata.New(len(hrsp.Header))
for k, v := range hrsp.Header {
opts.ResponseMetadata.Set(k, strings.Join(v, ","))
}
}
select {
case <-ctx.Done():
err = ctx.Err()
default:
ct := DefaultContentType
if htype := hrsp.Header.Get("Content-Type"); htype != "" {
ct = htype
}
if hrsp.Body != nil {
buf, err = io.ReadAll(hrsp.Body)
func parseRsp(ctx context.Context, hrsp *http.Response, cf codec.Codec, rsp interface{}, opts client.CallOptions) error {
b, err := ioutil.ReadAll(hrsp.Body)
if err != nil {
if h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(ctx, "failed to read body", err)
}
return errors.InternalServerError("go.micro.client", string(buf))
}
return errors.InternalServerError("go.micro.client", err.Error())
}
cf, cerr := h.newCodec(ct)
if cerr != nil {
if h.opts.Logger.V(logger.DebugLevel) {
h.opts.Logger.Debug(ctx, fmt.Sprintf("response with %v unknown content-type %s %s", hrsp.Header, ct, buf))
}
return errors.InternalServerError("go.micro.client", cerr.Error())
}
if h.opts.Logger.V(logger.DebugLevel) {
h.opts.Logger.Debug(ctx, fmt.Sprintf("response %s with %v", buf, hrsp.Header))
}
// succeseful response
if hrsp.StatusCode < 400 {
if err = cf.Unmarshal(buf, rsp); err != nil {
// unmarshal
if err := cf.Unmarshal(b, rsp); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
return nil
}
// response with error
var rerr interface{}
errmap, ok := opts.Context.Value(errorMapKey{}).(map[string]interface{})
if ok && errmap != nil {
rerr, ok = errmap[fmt.Sprintf("%d", hrsp.StatusCode)]
if !ok || errmap == nil {
// user not provide map of errors
// id: req.Service() ??
return errors.New("go.micro.client", string(b), int32(hrsp.StatusCode))
}
if err, ok = errmap[fmt.Sprintf("%d", hrsp.StatusCode)].(error); !ok {
err, ok = errmap["default"].(error)
}
if !ok {
rerr, ok = errmap["default"]
}
return errors.New("go.micro.client", string(b), int32(hrsp.StatusCode))
}
if !ok || rerr == nil {
return errors.New("go.micro.client", string(buf), int32(hrsp.StatusCode))
}
if cerr := cf.Unmarshal(buf, rerr); cerr != nil {
if cerr := cf.Unmarshal(b, err); cerr != nil {
return errors.InternalServerError("go.micro.client", cerr.Error())
}
if err, ok = rerr.(error); !ok {
err = &Error{rerr}
}
}
return err
}
type tag struct {
key string
name string
}
func getParam(val reflect.Value) string {
var v string
switch val.Kind() {
case reflect.Ptr:
switch reflect.Indirect(val).Type().String() {
case
"wrapperspb.BoolValue",
"wrapperspb.BytesValue",
"wrapperspb.DoubleValue",
"wrapperspb.FloatValue",
"wrapperspb.Int32Value", "wrapperspb.Int64Value",
"wrapperspb.StringValue",
"wrapperspb.UInt32Value", "wrapperspb.UInt64Value":
if eva := reflect.Indirect(val).FieldByName("Value"); eva.IsValid() {
v = getParam(eva)
}
}
default:
v = fmt.Sprintf("%v", val.Interface())
}
return v
opts []string
}

View File

@ -5,46 +5,28 @@ import (
"testing"
)
func TestParsing(t *testing.T) {
type Message struct {
IIN string `protobuf:"bytes,1,opt,name=iin,proto3" json:"iin"`
}
omsg := &Message{IIN: "5555"}
for _, m := range []string{"POST"} {
body := ""
path, nmsg, err := newPathRequest("/users/iin/{iin}/push-notifications", m, body, omsg, []string{"protobuf", "json"}, nil)
func TestTemplate(t *testing.T) {
tpl, err := newTemplate("/v1/{ClientID}/list")
if err != nil {
t.Fatal(err)
}
u, err := url.Parse(path)
if err != nil {
t.Fatal(err)
}
_ = nmsg
if u.Path != "/users/iin/5555/push-notifications" {
t.Fatalf("newPathRequest invalid path %s", u.Path)
}
if nmsg != nil {
t.Fatalf("new message must be nil: %v\n", nmsg)
}
}
_ = tpl
// fmt.Printf("%#+v\n", tpl.Pool)
}
func TestNewPathRequest(t *testing.T) {
type Message struct {
Name string `json:"name"`
Val1 string `protobuf:"bytes,1,opt,name=val1,proto3" json:"val1"`
Val3 []string
Val2 int64
Val3 []string
}
omsg := &Message{Name: "test_name", Val1: "test_val1", Val2: 100, Val3: []string{"slice"}}
for _, m := range []string{"POST", "PUT", "PATCH", "GET", "DELETE"} {
body := ""
path, nmsg, err := newPathRequest("/v1/test", m, body, omsg, []string{"protobuf", "json"}, nil)
path, nmsg, err := newPathRequest("/v1/test", m, body, omsg, []string{"protobuf", "json"})
if err != nil {
t.Fatal(err)
}
@ -59,44 +41,12 @@ func TestNewPathRequest(t *testing.T) {
}
}
func TestNewPathRequestWithEmptyBody(t *testing.T) {
val := struct{}{}
cases := []string{
"",
"*",
"{}",
"nil",
`{"type": "invalid"}`,
}
for _, body := range cases {
for _, m := range []string{"POST", "PUT", "PATCH", "GET", "DELETE"} {
path, nmsg, err := newPathRequest("/v1/test", m, body, val, []string{"protobuf", "json"}, nil)
if err != nil {
t.Fatal(err)
}
if nmsg == nil {
t.Fatalf("invalid path: nil nmsg")
}
u, err := url.Parse(path)
if err != nil {
t.Fatal(err)
}
vals := u.Query()
if len(vals) != 0 {
t.Fatalf("invalid path: %v nmsg: %v", path, nmsg)
}
}
}
}
func TestNewPathVarRequest(t *testing.T) {
type Message struct {
Name string `json:"name"`
Val1 string `protobuf:"bytes,1,opt,name=val1,proto3" json:"val1"`
Val3 []string
Val2 int64
Val3 []string
}
omsg := &Message{Name: "test_name", Val1: "test_val1", Val2: 100, Val3: []string{"slice"}}
@ -106,7 +56,7 @@ func TestNewPathVarRequest(t *testing.T) {
if m != "GET" {
body = "*"
}
path, nmsg, err := newPathRequest("/v1/test/{val1}", m, body, omsg, []string{"protobuf", "json"}, nil)
path, nmsg, err := newPathRequest("/v1/test/{val1}", m, body, omsg, []string{"protobuf", "json"})
if err != nil {
t.Fatal(err)
}