Compare commits

..

No commits in common. "master" and "v3.3.1" have entirely different histories.

21 changed files with 469 additions and 1005 deletions

View File

@ -1,19 +0,0 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
version: 2
updates:
# Maintain dependencies for GitHub Actions
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
# Maintain dependencies for Golang
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "daily"

20
.github/renovate.json vendored Normal file
View File

@ -0,0 +1,20 @@
{
"extends": [
"config:base"
],
"postUpdateOptions": ["gomodTidy"],
"packageRules": [
{
"matchUpdateTypes": ["minor", "patch", "pin", "digest"],
"automerge": true
},
{
"groupName": "all deps",
"separateMajorMinor": true,
"groupSlug": "all",
"packagePatterns": [
"*"
]
}
]
}

13
.github/stale.sh vendored Executable file
View File

@ -0,0 +1,13 @@
#!/bin/bash -ex
export PATH=$PATH:$(pwd)/bin
export GO111MODULE=on
export GOBIN=$(pwd)/bin
#go get github.com/rvflash/goup@v0.4.1
#goup -v ./...
#go get github.com/psampaz/go-mod-outdated@v0.6.0
go list -u -m -mod=mod -json all | go-mod-outdated -update -direct -ci || true
#go list -u -m -json all | go-mod-outdated -update

View File

@ -1,20 +0,0 @@
name: "autoapprove"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
autoapprove:
runs-on: ubuntu-latest
steps:
- name: approve
uses: hmarr/auto-approve-action@v3
if: github.actor == 'vtolstov' || github.actor == 'dependabot[bot]'
id: approve
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -1,21 +0,0 @@
name: "automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'vtolstov'
steps:
- name: merge
id: merge
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

View File

@ -3,20 +3,19 @@ on:
push: push:
branches: branches:
- master - master
- v3
jobs: jobs:
test: test:
name: test name: test
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: setup - name: setup
uses: actions/setup-go@v3 uses: actions/setup-go@v2
with: with:
go-version: 1.17 go-version: 1.16
- name: checkout - name: checkout
uses: actions/checkout@v3 uses: actions/checkout@v2
- name: cache - name: cache
uses: actions/cache@v3 uses: actions/cache@v2
with: with:
path: ~/go/pkg/mod path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
@ -32,9 +31,9 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: checkout - name: checkout
uses: actions/checkout@v3 uses: actions/checkout@v2
- name: lint - name: lint
uses: golangci/golangci-lint-action@v3.4.0 uses: golangci/golangci-lint-action@v2
continue-on-error: true continue-on-error: true
with: with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.

View File

@ -1,78 +0,0 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "codeql"
on:
workflow_run:
workflows: ["prbuild"]
types:
- completed
push:
branches: [ master, v3 ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master, v3 ]
schedule:
- cron: '34 1 * * 0'
jobs:
analyze:
name: analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: checkout
uses: actions/checkout@v3
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
# Initializes the CodeQL tools for scanning.
- name: init
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: autobuild
uses: github/codeql-action/autobuild@v2
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: analyze
uses: github/codeql-action/analyze@v2

View File

@ -1,27 +0,0 @@
name: "dependabot-automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'dependabot[bot]'
steps:
- name: metadata
id: metadata
uses: dependabot/fetch-metadata@v1.3.6
with:
github-token: "${{ secrets.TOKEN }}"
- name: merge
id: merge
if: ${{contains(steps.metadata.outputs.dependency-names, 'go.unistack.org')}}
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

View File

@ -3,20 +3,19 @@ on:
pull_request: pull_request:
branches: branches:
- master - master
- v3
jobs: jobs:
test: test:
name: test name: test
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: setup - name: setup
uses: actions/setup-go@v3 uses: actions/setup-go@v2
with: with:
go-version: 1.17 go-version: 1.16
- name: checkout - name: checkout
uses: actions/checkout@v3 uses: actions/checkout@v2
- name: cache - name: cache
uses: actions/cache@v3 uses: actions/cache@v2
with: with:
path: ~/go/pkg/mod path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
@ -32,9 +31,9 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: checkout - name: checkout
uses: actions/checkout@v3 uses: actions/checkout@v2
- name: lint - name: lint
uses: golangci/golangci-lint-action@v3.4.0 uses: golangci/golangci-lint-action@v2
continue-on-error: true continue-on-error: true
with: with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.

View File

@ -1,44 +0,0 @@
run:
concurrency: 4
deadline: 5m
issues-exit-code: 1
tests: true
linters-settings:
govet:
check-shadowing: true
enable:
- fieldalignment
linters:
enable:
- govet
- deadcode
- errcheck
- govet
- ineffassign
- staticcheck
- structcheck
- typecheck
- unused
- varcheck
- bodyclose
- gci
- goconst
- gocritic
- gosimple
- gofmt
- gofumpt
- goimports
- golint
- gosec
- makezero
- misspell
- nakedret
- nestif
- nilerr
- noctx
- prealloc
- unconvert
- unparam
disable-all: false

View File

@ -5,14 +5,14 @@ This plugin is a http client for micro.
## Overview ## Overview
The http client wraps `net/http` to provide a robust micro client with service discovery, load balancing and streaming. The http client wraps `net/http` to provide a robust micro client with service discovery, load balancing and streaming.
It complies with the [micro.Client](https://godoc.org/go.unistack.org/micro-client-http/v4#Client) interface. It complies with the [micro.Client](https://godoc.org/github.com/unistack-org/micro-client-http#Client) interface.
## Usage ## Usage
### Use directly ### Use directly
```go ```go
import "go.unistack.org/micro-client-http/v4" import "github.com/unistack-org/micro-client-http"
service := micro.NewService( service := micro.NewService(
micro.Name("my.service"), micro.Name("my.service"),

6
go.mod
View File

@ -1,5 +1,5 @@
module go.unistack.org/micro-client-http/v4 module github.com/unistack-org/micro-client-http/v3
go 1.19 go 1.16
require go.unistack.org/micro/v4 v4.0.18 require github.com/unistack-org/micro/v3 v3.3.3

23
go.sum
View File

@ -1,6 +1,17 @@
go.unistack.org/micro/v4 v4.0.1 h1:xo1IxbVfgh8i0eY0VeYa3cbb13u5n/Mxnp3FOgWD4Jo= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
go.unistack.org/micro/v4 v4.0.1/go.mod h1:p/J5UcSJjfHsWGT31uKoghQ5rUQZzQJBAFy+Z4+ZVMs= github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
go.unistack.org/micro/v4 v4.0.6 h1:YFWvTh3VwyOd6NHYTQcf47n2TF5+p/EhpnbuBQX3qhk= github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
go.unistack.org/micro/v4 v4.0.6/go.mod h1:bVEYTlPi0EsdgZZt311bIroDg9ict7ky3C87dSCCAGk= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
go.unistack.org/micro/v4 v4.0.18 h1:b7WFwem8Nz1xBrRg5FeLnm9CE5gJseHyf9j0BhkiXW0= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
go.unistack.org/micro/v4 v4.0.18/go.mod h1:5+da5r835gP0WnNZbYUJDCvWpJ9Xc3IEGyp62e8o8R4= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/unistack-org/micro/v3 v3.3.3 h1:Igkzl8tWPlIacEK9z8hHVIzhdyzi8drQPt0Am2iHAcA=
github.com/unistack-org/micro/v3 v3.3.3/go.mod h1:tX95c0Qx4w6oqU7qKThs9lya9P507BdZ29MsTVDmU6w=
golang.org/x/net v0.0.0-20210324205630-d1beb07c2056/go.mod h1:uSPa2vr4CLtc/ILN5odXGNXS6mhrKVzTaCXzk9m6W3k=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

464
http.go
View File

@ -1,5 +1,5 @@
// Package http provides a http client // Package http provides a http client
package http // import "go.unistack.org/micro-client-http/v4" package http
import ( import (
"bufio" "bufio"
@ -10,61 +10,56 @@ import (
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"strings" "os"
"sync"
"time" "time"
"go.unistack.org/micro/v4/client" "github.com/unistack-org/micro/v3/broker"
"go.unistack.org/micro/v4/codec" "github.com/unistack-org/micro/v3/client"
"go.unistack.org/micro/v4/errors" "github.com/unistack-org/micro/v3/codec"
"go.unistack.org/micro/v4/logger" "github.com/unistack-org/micro/v3/errors"
"go.unistack.org/micro/v4/metadata" "github.com/unistack-org/micro/v3/metadata"
"go.unistack.org/micro/v4/options" "github.com/unistack-org/micro/v3/router"
"go.unistack.org/micro/v4/selector"
rutil "go.unistack.org/micro/v4/util/reflect"
) )
var DefaultContentType = "application/json" var (
DefaultContentType = "application/json"
)
/*
func filterLabel(r []router.Route) []router.Route { func filterLabel(r []router.Route) []router.Route {
// selector.FilterLabel("protocol", "http") // selector.FilterLabel("protocol", "http")
return r return r
} }
*/
type httpClient struct { type httpClient struct {
httpcli *http.Client
opts client.Options opts client.Options
sync.RWMutex dialer *net.Dialer
httpcli *http.Client
init bool init bool
} }
func newRequest(ctx context.Context, log logger.Logger, addr string, req client.Request, ct string, cf codec.Codec, msg interface{}, opts client.CallOptions) (*http.Request, error) { func newRequest(addr string, req client.Request, ct string, cf codec.Codec, msg interface{}, opts client.CallOptions) (*http.Request, error) {
var tags []string hreq := &http.Request{Method: http.MethodPost}
var parameters map[string]map[string]string
scheme := "http"
method := http.MethodPost
body := "*" // as like google api http annotation body := "*" // as like google api http annotation
host := addr
path := req.Endpoint()
var tags []string
var scheme string
u, err := url.Parse(addr) u, err := url.Parse(addr)
if err == nil { if err != nil {
scheme = u.Scheme hreq.URL = &url.URL{
path = u.Path Scheme: "http",
host = u.Host Host: addr,
} else { Path: req.Endpoint(),
u = &url.URL{Scheme: scheme, Path: path, Host: host}
} }
hreq.Host = addr
// nolint: nestif scheme = "http"
} else {
ep := req.Endpoint()
if opts.Context != nil { if opts.Context != nil {
if m, ok := opts.Context.Value(methodKey{}).(string); ok { if m, ok := opts.Context.Value(methodKey{}).(string); ok {
method = m hreq.Method = m
} }
if p, ok := opts.Context.Value(pathKey{}).(string); ok { if p, ok := opts.Context.Value(pathKey{}).(string); ok {
path += p ep = p
} }
if b, ok := opts.Context.Value(bodyKey{}).(string); ok { if b, ok := opts.Context.Value(bodyKey{}).(string); ok {
body = b body = b
@ -72,31 +67,11 @@ func newRequest(ctx context.Context, log logger.Logger, addr string, req client.
if t, ok := opts.Context.Value(structTagsKey{}).([]string); ok && len(t) > 0 { if t, ok := opts.Context.Value(structTagsKey{}).([]string); ok && len(t) > 0 {
tags = t tags = t
} }
if k, ok := opts.Context.Value(headerKey{}).([]string); ok && len(k) > 0 {
if parameters == nil {
parameters = make(map[string]map[string]string)
}
m, ok := parameters["header"]
if !ok {
m = make(map[string]string)
parameters["header"] = m
}
for idx := 0; idx < len(k)/2; idx += 2 {
m[k[idx]] = k[idx+1]
}
}
if k, ok := opts.Context.Value(cookieKey{}).([]string); ok && len(k) > 0 {
if parameters == nil {
parameters = make(map[string]map[string]string)
}
m, ok := parameters["cookie"]
if !ok {
m = make(map[string]string)
parameters["cookie"] = m
}
for idx := 0; idx < len(k)/2; idx += 2 {
m[k[idx]] = k[idx+1]
} }
hreq.URL, err = u.Parse(ep)
if err != nil {
return nil, errors.BadRequest("go.micro.client", err.Error())
} }
} }
@ -109,167 +84,112 @@ func newRequest(ctx context.Context, log logger.Logger, addr string, req client.
} }
} }
if path == "" { path, nmsg, err := newPathRequest(hreq.URL.Path, hreq.Method, body, msg, tags)
path = req.Endpoint()
}
u, err = u.Parse(path)
if err != nil { if err != nil {
return nil, errors.BadRequest("go.micro.client", err.Error()) return nil, errors.BadRequest("go.micro.client", err.Error())
} }
var nmsg interface{} if scheme != "" {
if len(u.Query()) > 0 { hreq.URL, err = url.Parse(scheme + "://" + addr + path)
path, nmsg, err = newPathRequest(u.Path+"?"+u.RawQuery, method, body, msg, tags, parameters)
} else { } else {
path, nmsg, err = newPathRequest(u.Path, method, body, msg, tags, parameters) hreq.URL, err = url.Parse(addr + path)
} }
if err != nil { if err != nil {
return nil, errors.BadRequest("go.micro.client", err.Error()) return nil, errors.BadRequest("go.micro.client", err.Error())
} }
u, err = url.Parse(fmt.Sprintf("%s://%s%s", scheme, host, path)) // marshal request is struct not empty
if nmsg != nil {
var b []byte
b, err = cf.Marshal(nmsg)
if err != nil { if err != nil {
return nil, errors.BadRequest("go.micro.client", err.Error()) return nil, errors.BadRequest("go.micro.client", err.Error())
} }
hreq.Body = ioutil.NopCloser(bytes.NewBuffer(b))
var cookies []*http.Cookie
header := make(http.Header)
if opts.Context != nil {
if md, ok := opts.Context.Value(metadataKey{}).(metadata.Metadata); ok {
for k, v := range md {
header[k] = v
}
}
}
if opts.AuthToken != "" {
header.Set(metadata.HeaderAuthorization, opts.AuthToken)
}
if opts.RequestMetadata != nil {
for k, v := range opts.RequestMetadata {
header[k] = v
}
}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
for k, v := range md {
header[k] = v
}
}
// set timeout in nanoseconds
if opts.StreamTimeout > time.Duration(0) {
header.Set(metadata.HeaderTimeout, fmt.Sprintf("%d", opts.StreamTimeout))
}
if opts.RequestTimeout > time.Duration(0) {
header.Set(metadata.HeaderTimeout, fmt.Sprintf("%d", opts.RequestTimeout))
}
// set the content type for the request
header.Set(metadata.HeaderContentType, ct)
var v interface{}
for km, vm := range parameters {
for k, required := range vm {
v, err = rutil.StructFieldByPath(msg, k)
if err != nil {
return nil, errors.BadRequest("go.micro.client", err.Error())
}
if rutil.IsZero(v) {
if required == "true" {
return nil, errors.BadRequest("go.micro.client", fmt.Sprintf("required field %s not set", k))
}
continue
}
switch km {
case "header":
header.Set(k, fmt.Sprintf("%v", v))
case "cookie":
cookies = append(cookies, &http.Cookie{Name: k, Value: fmt.Sprintf("%v", v)})
}
}
}
b, err := cf.Marshal(nmsg)
if err != nil {
return nil, errors.BadRequest("go.micro.client", err.Error())
}
var hreq *http.Request
if len(b) > 0 {
hreq, err = http.NewRequestWithContext(ctx, method, u.String(), ioutil.NopCloser(bytes.NewBuffer(b)))
hreq.ContentLength = int64(len(b)) hreq.ContentLength = int64(len(b))
header.Set("Content-Length", fmt.Sprintf("%d", hreq.ContentLength))
} else {
hreq, err = http.NewRequestWithContext(ctx, method, u.String(), nil)
}
if err != nil {
return nil, errors.BadRequest("go.micro.client", err.Error())
}
hreq.Header = header
for _, cookie := range cookies {
hreq.AddCookie(cookie)
}
if log.V(logger.DebugLevel) {
log.Debug(ctx, fmt.Sprintf("request %s to %s with headers %v body %s", method, u.String(), hreq.Header, b))
} }
return hreq, nil return hreq, nil
} }
func (h *httpClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { func (h *httpClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
ct := req.ContentType() header := make(http.Header, 2)
if len(opts.ContentType) > 0 { if md, ok := metadata.FromContext(ctx); ok {
ct = opts.ContentType for k, v := range md {
header.Set(k, v)
}
} }
ct := req.ContentType()
// set timeout in nanoseconds
header.Set("Timeout", fmt.Sprintf("%d", opts.RequestTimeout))
// set the content type for the request
header.Set("Content-Type", ct)
// get codec
cf, err := h.newCodec(ct) cf, err := h.newCodec(ct)
if err != nil { if err != nil {
return errors.BadRequest("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
hreq, err := newRequest(ctx, h.opts.Logger, addr, req, ct, cf, req.Body(), opts)
hreq, err := newRequest(addr, req, ct, cf, req.Body(), opts)
if err != nil { if err != nil {
return err return err
} }
hreq.Header = header
// make the request // make the request
hrsp, err := h.httpcli.Do(hreq) hrsp, err := h.httpcli.Do(hreq.WithContext(ctx))
if err != nil { if err != nil {
switch err := err.(type) { switch err := err.(type) {
case *url.Error:
if err, ok := err.Err.(net.Error); ok && err.Timeout() {
return errors.Timeout("go.micro.client", err.Error())
}
case net.Error: case net.Error:
if err.Timeout() { if err.Timeout() {
return errors.Timeout("go.micro.client", err.Error()) return errors.Timeout("go.micro.client", err.Error())
} }
case *url.Error:
if err, ok := err.Err.(net.Error); ok && err.Timeout() {
return errors.Timeout("go.micro.client", err.Error())
}
} }
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
defer hrsp.Body.Close() defer hrsp.Body.Close()
return h.parseRsp(ctx, hrsp, rsp, opts) return parseRsp(ctx, hrsp, cf, rsp, opts)
} }
func (h *httpClient) stream(ctx context.Context, addr string, req client.Request, opts client.CallOptions) (client.Stream, error) { func (h *httpClient) stream(ctx context.Context, addr string, req client.Request, opts client.CallOptions) (client.Stream, error) {
ct := req.ContentType() var header http.Header
if len(opts.ContentType) > 0 {
ct = opts.ContentType if md, ok := metadata.FromContext(ctx); ok {
header = make(http.Header, len(md)+2)
for k, v := range md {
header.Set(k, v)
} }
} else {
header = make(http.Header, 2)
}
ct := req.ContentType()
// set timeout in nanoseconds
header.Set("Timeout", fmt.Sprintf("%d", opts.RequestTimeout))
// set the content type for the request
header.Set("Content-Type", ct)
// get codec // get codec
cf, err := h.newCodec(ct) cf, err := h.newCodec(req.ContentType())
if err != nil { if err != nil {
return nil, errors.BadRequest("go.micro.client", err.Error()) return nil, errors.InternalServerError("go.micro.client", err.Error())
} }
dialAddr := addr
u, err := url.Parse(dialAddr)
if err == nil && u.Scheme != "" && u.Host != "" {
dialAddr = u.Host
}
cc, err := (h.httpcli.Transport).(*http.Transport).DialContext(ctx, "tcp", addr) cc, err := (h.httpcli.Transport).(*http.Transport).DialContext(ctx, "tcp", addr)
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error dialing: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error dialing: %v", err))
@ -277,26 +197,19 @@ func (h *httpClient) stream(ctx context.Context, addr string, req client.Request
return &httpStream{ return &httpStream{
address: addr, address: addr,
logger: h.opts.Logger,
context: ctx, context: ctx,
closed: make(chan bool), closed: make(chan bool),
opts: opts, opts: opts,
conn: cc, conn: cc,
ct: ct, ct: ct,
cf: cf, cf: cf,
header: header,
reader: bufio.NewReader(cc), reader: bufio.NewReader(cc),
request: req, request: req,
}, nil }, nil
} }
func (h *httpClient) newCodec(ct string) (codec.Codec, error) { func (h *httpClient) newCodec(ct string) (codec.Codec, error) {
h.RLock()
defer h.RUnlock()
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx]
}
if c, ok := h.opts.Codecs[ct]; ok { if c, ok := h.opts.Codecs[ct]; ok {
return c, nil return c, nil
} }
@ -304,7 +217,7 @@ func (h *httpClient) newCodec(ct string) (codec.Codec, error) {
return nil, codec.ErrUnknownContentType return nil, codec.ErrUnknownContentType
} }
func (h *httpClient) Init(opts ...options.Option) error { func (h *httpClient) Init(opts ...client.Option) error {
if len(opts) == 0 && h.init { if len(opts) == 0 && h.init {
return nil return nil
} }
@ -312,6 +225,9 @@ func (h *httpClient) Init(opts ...options.Option) error {
o(&h.opts) o(&h.opts)
} }
if err := h.opts.Broker.Init(); err != nil {
return err
}
if err := h.opts.Tracer.Init(); err != nil { if err := h.opts.Tracer.Init(); err != nil {
return err return err
} }
@ -324,6 +240,9 @@ func (h *httpClient) Init(opts ...options.Option) error {
if err := h.opts.Meter.Init(); err != nil { if err := h.opts.Meter.Init(); err != nil {
return err return err
} }
if err := h.opts.Transport.Init(); err != nil {
return err
}
return nil return nil
} }
@ -332,11 +251,15 @@ func (h *httpClient) Options() client.Options {
return h.opts return h.opts
} }
func (h *httpClient) NewRequest(service, method string, req interface{}, opts ...options.Option) client.Request { func (h *httpClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
return newHTTPRequest(service, method, req, h.opts.ContentType, opts...) return newHTTPMessage(topic, msg, h.opts.ContentType, opts...)
} }
func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...options.Option) error { func (h *httpClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return newHTTPRequest(service, method, req, h.opts.ContentType, reqOpts...)
}
func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
// make a copy of call opts // make a copy of call opts
callOpts := h.opts.CallOptions callOpts := h.opts.CallOptions
for _, opt := range opts { for _, opt := range opts {
@ -353,9 +276,8 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
} else { } else {
// got a deadline so no need to setup context // got a deadline so no need to setup context
// but we need to set the timeout we pass along // but we need to set the timeout we pass along
if err := options.Set(&callOpts, time.Until(d), ".RequestTimeout"); err != nil { opt := client.WithRequestTimeout(d.Sub(time.Now()))
return errors.New("go.micro.client", fmt.Sprintf("%v", err.Error()), 400) opt(&callOpts)
}
} }
// should we noop right here? // should we noop right here?
@ -369,9 +291,9 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
hcall := h.call hcall := h.call
// wrap the call in reverse // wrap the call in reverse
//for i := len(callOpts.CallWrappers); i > 0; i-- { for i := len(callOpts.CallWrappers); i > 0; i-- {
// hcall = callOpts.CallWrappers[i-1](hcall) hcall = callOpts.CallWrappers[i-1](hcall)
//} }
// use the router passed as a call option, or fallback to the rpc clients router // use the router passed as a call option, or fallback to the rpc clients router
if callOpts.Router == nil { if callOpts.Router == nil {
@ -388,7 +310,18 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
callOpts.Address = []string{h.opts.Proxy} callOpts.Address = []string{h.opts.Proxy}
} }
var next selector.Next // lookup the route to send the reques to
// TODO apply any filtering here
routes, err := h.opts.Lookup(ctx, req, callOpts)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// balance the list of nodes
next, err := callOpts.Selector.Select(routes)
if err != nil {
return err
}
// return errors.New("go.micro.client", "request timeout", 408) // return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error { call := func(i int) error {
@ -403,22 +336,6 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
time.Sleep(t) time.Sleep(t)
} }
if next == nil {
var routes []string
// lookup the route to send the reques to
// TODO apply any filtering here
routes, err = h.opts.Lookup(ctx, req, callOpts)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// balance the list of nodes
next, err = callOpts.Selector.Select(routes)
if err != nil {
return err
}
}
node := next() node := next()
// make the call // make the call
@ -469,28 +386,25 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
return gerr return gerr
} }
func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...options.Option) (client.Stream, error) { func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
var err error
// make a copy of call opts // make a copy of call opts
callOpts := h.opts.CallOptions callOpts := h.opts.CallOptions
for _, o := range opts { for _, opt := range opts {
o(&callOpts) opt(&callOpts)
} }
// check if we already have a deadline // check if we already have a deadline
d, ok := ctx.Deadline() d, ok := ctx.Deadline()
if !ok && callOpts.StreamTimeout > time.Duration(0) { if !ok {
var cancel context.CancelFunc var cancel context.CancelFunc
// no deadline so we create a new one // no deadline so we create a new one
ctx, cancel = context.WithTimeout(ctx, callOpts.StreamTimeout) ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout)
defer cancel() defer cancel()
} else { } else {
// got a deadline so no need to setup context // got a deadline so no need to setup context
// but we need to set the timeout we pass along // but we need to set the timeout we pass along
if err = options.Set(&callOpts, time.Until(d), ".StreamTimeout"); err != nil { opt := client.WithRequestTimeout(d.Sub(time.Now()))
return nil, errors.New("go.micro.client", fmt.Sprintf("%v", err.Error()), 400) opt(&callOpts)
}
} }
// should we noop right here? // should we noop right here?
@ -502,7 +416,10 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...opt
/* /*
// make copy of call method // make copy of call method
hstream := h.stream hstream, err := h.stream()
if err != nil {
return nil, err
}
// wrap the call in reverse // wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- { for i := len(callOpts.CallWrappers); i > 0; i-- {
hstream = callOpts.CallWrappers[i-1](hstream) hstream = callOpts.CallWrappers[i-1](hstream)
@ -524,13 +441,24 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...opt
callOpts.Address = []string{h.opts.Proxy} callOpts.Address = []string{h.opts.Proxy}
} }
var next selector.Next // lookup the route to send the reques to
// TODO apply any filtering here
routes, err := h.opts.Lookup(ctx, req, callOpts)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
// balance the list of nodes
next, err := callOpts.Selector.Select(routes)
if err != nil {
return nil, err
}
call := func(i int) (client.Stream, error) { call := func(i int) (client.Stream, error) {
// call backoff first. Someone may want an initial start delay // call backoff first. Someone may want an initial start delay
t, cerr := callOpts.Backoff(ctx, req, i) t, err := callOpts.Backoff(ctx, req, i)
if cerr != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", cerr.Error()) return nil, errors.InternalServerError("go.micro.client", err.Error())
} }
// only sleep if greater than 0 // only sleep if greater than 0
@ -538,37 +466,21 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...opt
time.Sleep(t) time.Sleep(t)
} }
if next == nil {
var routes []string
// lookup the route to send the reques to
// TODO apply any filtering here
routes, err = h.opts.Lookup(ctx, req, callOpts)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
// balance the list of nodes
next, err = callOpts.Selector.Select(routes)
if err != nil {
return nil, err
}
}
node := next() node := next()
stream, cerr := h.stream(ctx, node, req, callOpts) stream, err := h.stream(ctx, node, req, callOpts)
// record the result of the call to inform future routing decisions // record the result of the call to inform future routing decisions
if verr := h.opts.Selector.Record(node, cerr); verr != nil { if verr := h.opts.Selector.Record(node, err); verr != nil {
return nil, verr return nil, verr
} }
// try and transform the error to a go-micro error // try and transform the error to a go-micro error
if verr, ok := cerr.(*errors.Error); ok { if verr, ok := err.(*errors.Error); ok {
return nil, verr return nil, verr
} }
return stream, cerr return stream, err
} }
type response struct { type response struct {
@ -581,8 +493,8 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...opt
for i := 0; i <= callOpts.Retries; i++ { for i := 0; i <= callOpts.Retries; i++ {
go func() { go func() {
s, cerr := call(i) s, err := call(i)
ch <- response{s, cerr} ch <- response{s, err}
}() }()
select { select {
@ -610,6 +522,52 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...opt
return nil, grr return nil, grr
} }
func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
options := client.NewPublishOptions(opts...)
md, ok := metadata.FromContext(ctx)
if !ok {
md = metadata.New(2)
}
md["Content-Type"] = p.ContentType()
md["Micro-Topic"] = p.Topic()
cf, err := h.newCodec(p.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
var body []byte
// passed in raw data
if d, ok := p.Payload().(*codec.Frame); ok {
body = d.Data
} else {
b := bytes.NewBuffer(nil)
if err := cf.Write(b, &codec.Message{Type: codec.Event}, p.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
body = b.Bytes()
}
topic := p.Topic()
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
options.Exchange = prx
}
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}
return h.opts.Broker.Publish(ctx, topic, &broker.Message{
Header: md,
Body: body,
}, broker.PublishContext(ctx))
}
func (h *httpClient) String() string { func (h *httpClient) String() string {
return "http" return "http"
} }
@ -618,7 +576,7 @@ func (h *httpClient) Name() string {
return h.opts.Name return h.opts.Name
} }
func NewClient(opts ...options.Option) client.Client { func NewClient(opts ...client.Option) client.Client {
options := client.NewOptions(opts...) options := client.NewOptions(opts...)
if len(options.ContentType) == 0 { if len(options.ContentType) == 0 {
@ -629,33 +587,20 @@ func NewClient(opts ...options.Option) client.Client {
opts: options, opts: options,
} }
var dialer func(context.Context, string) (net.Conn, error) dialer, ok := options.Context.Value(httpDialerKey{}).(*net.Dialer)
if v, ok := options.Context.Value(httpDialerKey{}).(*net.Dialer); ok { if !ok {
dialer = func(ctx context.Context, addr string) (net.Conn, error) { dialer = &net.Dialer{
return v.DialContext(ctx, "tcp", addr)
}
}
if options.ContextDialer != nil {
dialer = options.ContextDialer
}
if dialer == nil {
dialer = func(ctx context.Context, addr string) (net.Conn, error) {
return (&net.Dialer{
Timeout: 30 * time.Second, Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second, KeepAlive: 30 * time.Second,
}).DialContext(ctx, "tcp", addr)
} }
} }
if httpcli, ok := options.Context.Value(httpClientKey{}).(*http.Client); ok { if httpcli, ok := options.Context.Value(httpClientKey{}).(*http.Client); ok {
rc.httpcli = httpcli rc.httpcli = httpcli
} else { } else {
// TODO customTransport := http.DefaultTransport.(*http.Transport).Clone()
tr := &http.Transport{ tr := &http.Transport{
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { DialContext: dialer.DialContext,
return dialer(ctx, addr)
},
ForceAttemptHTTP2: true, ForceAttemptHTTP2: true,
MaxConnsPerHost: 100, MaxConnsPerHost: 100,
MaxIdleConns: 20, MaxIdleConns: 20,
@ -668,5 +613,10 @@ func NewClient(opts ...options.Option) client.Client {
} }
c := client.Client(rc) c := client.Client(rc)
// wrap in reverse
for i := len(options.Wrappers); i > 0; i-- {
c = options.Wrappers[i-1](c)
}
return c return c
} }

View File

@ -8,56 +8,14 @@ import (
type Request struct { type Request struct {
Name string `json:"name"` Name string `json:"name"`
Field1 string `json:"field1"` Field1 string
ClientID string
Field2 string Field2 string
Field3 int64 Field3 int64
} }
func TestPathWithHeader(t *testing.T) {
req := &Request{Name: "vtolstov", Field1: "field1", ClientID: "1234567890"}
p, m, err := newPathRequest(
"/api/v1/test?Name={name}&Field1={field1}",
"POST",
"*",
req,
nil,
map[string]map[string]string{"header": {"ClientID": "true"}},
)
if err != nil {
t.Fatal(err)
}
u, err := url.Parse(p)
if err != nil {
t.Fatal(err)
}
if m != nil {
t.Fatal("new struct must be nil")
}
if u.Query().Get("Name") != "vtolstov" || u.Query().Get("Field1") != "field1" {
t.Fatalf("invalid values %v", u.Query())
}
}
func TestPathValues(t *testing.T) {
req := &Request{Name: "vtolstov", Field1: "field1"}
p, m, err := newPathRequest("/api/v1/test?Name={name}&Field1={field1}", "POST", "*", req, nil, nil)
if err != nil {
t.Fatal(err)
}
u, err := url.Parse(p)
if err != nil {
t.Fatal(err)
}
_ = m
if u.Query().Get("Name") != "vtolstov" || u.Query().Get("Field1") != "field1" {
t.Fatalf("invalid values %v", u.Query())
}
}
func TestValidPath(t *testing.T) { func TestValidPath(t *testing.T) {
req := &Request{Name: "vtolstov", Field1: "field1", Field2: "field2", Field3: 10} req := &Request{Name: "vtolstov", Field1: "field1", Field2: "field2", Field3: 10}
p, m, err := newPathRequest("/api/v1/{name}/list", "GET", "", req, nil, nil) p, m, err := newPathRequest("/api/v1/{name}/list", "GET", "", req, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -74,8 +32,9 @@ func TestValidPath(t *testing.T) {
func TestInvalidPath(t *testing.T) { func TestInvalidPath(t *testing.T) {
req := &Request{Name: "vtolstov", Field1: "field1", Field2: "field2", Field3: 10} req := &Request{Name: "vtolstov", Field1: "field1", Field2: "field2", Field3: 10}
_, _, err := newPathRequest("/api/v1/{xname}/list", "GET", "", req, nil, nil) p, m, err := newPathRequest("/api/v1/{xname}/list", "GET", "", req, nil)
if err == nil { if err == nil {
t.Fatal("path param must not be filled") t.Fatalf("path param must not be filled")
} }
_, _ = p, m
} }

36
message.go Normal file
View File

@ -0,0 +1,36 @@
package http
import (
"github.com/unistack-org/micro/v3/client"
)
type httpMessage struct {
topic string
contentType string
payload interface{}
}
func newHTTPMessage(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message {
options := client.NewMessageOptions(opts...)
if len(options.ContentType) == 0 {
options.ContentType = contentType
}
return &httpMessage{
payload: payload,
topic: topic,
contentType: options.ContentType,
}
}
func (h *httpMessage) ContentType() string {
return h.contentType
}
func (h *httpMessage) Topic() string {
return h.topic
}
func (h *httpMessage) Payload() interface{} {
return h.payload
}

View File

@ -4,8 +4,7 @@ import (
"net" "net"
"net/http" "net/http"
"go.unistack.org/micro/v4/metadata" "github.com/unistack-org/micro/v3/client"
"go.unistack.org/micro/v4/options"
) )
var ( var (
@ -27,100 +26,70 @@ var (
) )
type poolMaxStreams struct{} type poolMaxStreams struct{}
// PoolMaxStreams maximum streams on a connectioin
func PoolMaxStreams(n int) options.Option {
return options.ContextOption(poolMaxStreams{}, n)
}
type poolMaxIdle struct{} type poolMaxIdle struct{}
type codecsKey struct{}
// PoolMaxIdle maximum idle conns of a pool type tlsAuth struct{}
func PoolMaxIdle(d int) options.Option {
return options.ContextOption(poolMaxIdle{}, d)
}
type maxRecvMsgSizeKey struct{} type maxRecvMsgSizeKey struct{}
// MaxRecvMsgSize set the maximum size of message that client can receive.
func MaxRecvMsgSize(s int) options.Option {
return options.ContextOption(maxRecvMsgSizeKey{}, s)
}
type maxSendMsgSizeKey struct{} type maxSendMsgSizeKey struct{}
// PoolMaxStreams maximum streams on a connectioin
func PoolMaxStreams(n int) client.Option {
return client.SetOption(poolMaxStreams{}, n)
}
// PoolMaxIdle maximum idle conns of a pool
func PoolMaxIdle(d int) client.Option {
return client.SetOption(poolMaxIdle{}, d)
}
// MaxRecvMsgSize set the maximum size of message that client can receive.
func MaxRecvMsgSize(s int) client.Option {
return client.SetOption(maxRecvMsgSizeKey{}, s)
}
// MaxSendMsgSize set the maximum size of message that client can send. // MaxSendMsgSize set the maximum size of message that client can send.
func MaxSendMsgSize(s int) options.Option { func MaxSendMsgSize(s int) client.Option {
return options.ContextOption(maxSendMsgSizeKey{}, s) return client.SetOption(maxSendMsgSizeKey{}, s)
} }
type httpClientKey struct{} type httpClientKey struct{}
// nolint: golint func HTTPClient(c *http.Client) client.Option {
// HTTPClient pass http.Client option to client Call return client.SetOption(httpClientKey{}, c)
func HTTPClient(c *http.Client) options.Option {
return options.ContextOption(httpClientKey{}, c)
} }
type httpDialerKey struct{} type httpDialerKey struct{}
// nolint: golint func HTTPDialer(d *net.Dialer) client.Option {
// HTTPDialer pass net.Dialer option to client return client.SetOption(httpDialerKey{}, d)
func HTTPDialer(d *net.Dialer) options.Option {
return options.ContextOption(httpDialerKey{}, d)
} }
type methodKey struct{} type methodKey struct{}
// Method pass method option to client Call func Method(m string) client.CallOption {
func Method(m string) options.Option { return client.SetCallOption(methodKey{}, m)
return options.ContextOption(methodKey{}, m)
} }
type pathKey struct{} type pathKey struct{}
// Path spcecifies path option to client Call func Path(p string) client.CallOption {
func Path(p string) options.Option { return client.SetCallOption(pathKey{}, p)
return options.ContextOption(pathKey{}, p)
} }
type bodyKey struct{} type bodyKey struct{}
// Body specifies body option to client Call func Body(b string) client.CallOption {
func Body(b string) options.Option { return client.SetCallOption(bodyKey{}, b)
return options.ContextOption(bodyKey{}, b)
} }
type errorMapKey struct{} type errorMapKey struct{}
func ErrorMap(m map[string]interface{}) options.Option { func ErrorMap(m map[string]interface{}) client.CallOption {
return options.ContextOption(errorMapKey{}, m) return client.SetCallOption(errorMapKey{}, m)
} }
type structTagsKey struct{} type structTagsKey struct{}
// StructTags pass tags slice option to client Call func StructTags(tags []string) client.CallOption {
func StructTags(tags []string) options.Option { return client.SetCallOption(structTagsKey{}, tags)
return options.ContextOption(structTagsKey{}, tags)
}
type metadataKey struct{}
// Metadata pass metadata to client Call
func Metadata(md metadata.Metadata) options.Option {
return options.ContextOption(metadataKey{}, md)
}
type cookieKey struct{}
// Cookie pass cookie to client Call
func Cookie(cookies ...string) options.Option {
return options.ContextOption(cookieKey{}, cookies)
}
type headerKey struct{}
// Header pass cookie to client Call
func Header(headers ...string) options.Option {
return options.ContextOption(headerKey{}, headers)
} }

View File

@ -1,9 +1,8 @@
package http package http
import ( import (
"go.unistack.org/micro/v4/client" "github.com/unistack-org/micro/v3/client"
"go.unistack.org/micro/v4/codec" "github.com/unistack-org/micro/v3/codec"
"go.unistack.org/micro/v4/options"
) )
type httpRequest struct { type httpRequest struct {
@ -14,8 +13,9 @@ type httpRequest struct {
opts client.RequestOptions opts client.RequestOptions
} }
func newHTTPRequest(service, method string, request interface{}, contentType string, opts ...options.Option) client.Request { func newHTTPRequest(service, method string, request interface{}, contentType string, opts ...client.RequestOption) client.Request {
options := client.NewRequestOptions(opts...) options := client.NewRequestOptions(opts...)
if len(options.ContentType) == 0 { if len(options.ContentType) == 0 {
options.ContentType = contentType options.ContentType = contentType
} }

104
stream.go
View File

@ -4,34 +4,35 @@ import (
"bufio" "bufio"
"context" "context"
"fmt" "fmt"
"io"
"net" "net"
"net/http" "net/http"
"sync" "sync"
"go.unistack.org/micro/v4/client" "github.com/unistack-org/micro/v3/client"
"go.unistack.org/micro/v4/codec" "github.com/unistack-org/micro/v3/codec"
"go.unistack.org/micro/v4/errors" "github.com/unistack-org/micro/v3/errors"
"go.unistack.org/micro/v4/logger"
) )
// Implements the streamer interface // Implements the streamer interface
type httpStream struct { type httpStream struct {
err error sync.RWMutex
conn net.Conn address string
opts client.CallOptions
ct string
cf codec.Codec cf codec.Codec
context context.Context context context.Context
logger logger.Logger header http.Header
request client.Request seq uint64
closed chan bool closed chan bool
err error
conn net.Conn
reader *bufio.Reader reader *bufio.Reader
address string request client.Request
ct string
opts client.CallOptions
sync.RWMutex
} }
var errShutdown = fmt.Errorf("connection is shut down") var (
errShutdown = fmt.Errorf("connection is shut down")
)
func (h *httpStream) isClosed() bool { func (h *httpStream) isClosed() bool {
select { select {
@ -54,10 +55,6 @@ func (h *httpStream) Response() client.Response {
return nil return nil
} }
func (h *httpStream) SendMsg(msg interface{}) error {
return h.Send(msg)
}
func (h *httpStream) Send(msg interface{}) error { func (h *httpStream) Send(msg interface{}) error {
h.Lock() h.Lock()
defer h.Unlock() defer h.Unlock()
@ -67,16 +64,14 @@ func (h *httpStream) Send(msg interface{}) error {
return errShutdown return errShutdown
} }
hreq, err := newRequest(h.context, h.logger, h.address, h.request, h.ct, h.cf, msg, h.opts) hreq, err := newRequest(h.address, h.request, h.ct, h.cf, msg, h.opts)
if err != nil { if err != nil {
return err return err
} }
return hreq.Write(h.conn) hreq.Header = h.header
}
func (h *httpStream) RecvMsg(msg interface{}) error { return hreq.Write(h.conn)
return h.Recv(msg)
} }
func (h *httpStream) Recv(msg interface{}) error { func (h *httpStream) Recv(msg interface{}) error {
@ -94,7 +89,7 @@ func (h *httpStream) Recv(msg interface{}) error {
} }
defer hrsp.Body.Close() defer hrsp.Body.Close()
return h.parseRsp(h.context, h.logger, hrsp, h.cf, msg, h.opts) return parseRsp(h.context, hrsp, h.cf, msg, h.opts)
} }
func (h *httpStream) Error() error { func (h *httpStream) Error() error {
@ -103,10 +98,6 @@ func (h *httpStream) Error() error {
return h.err return h.err
} }
func (h *httpStream) CloseSend() error {
return h.Close()
}
func (h *httpStream) Close() error { func (h *httpStream) Close() error {
select { select {
case <-h.closed: case <-h.closed:
@ -116,60 +107,3 @@ func (h *httpStream) Close() error {
return h.conn.Close() return h.conn.Close()
} }
} }
func (h *httpStream) parseRsp(ctx context.Context, log logger.Logger, hrsp *http.Response, cf codec.Codec, rsp interface{}, opts client.CallOptions) error {
var err error
var buf []byte
// fast path return
if hrsp.StatusCode == http.StatusNoContent {
return nil
}
select {
case <-ctx.Done():
err = ctx.Err()
default:
if hrsp.Body != nil {
buf, err = io.ReadAll(hrsp.Body)
if err != nil {
if log.V(logger.ErrorLevel) {
log.Error(ctx, "failed to read body", err)
}
return errors.InternalServerError("go.micro.client", string(buf))
}
}
if log.V(logger.DebugLevel) {
log.Debug(ctx, fmt.Sprintf("response %s with %v", buf, hrsp.Header))
}
if hrsp.StatusCode < 400 {
if err = cf.Unmarshal(buf, rsp); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
return nil
}
var rerr interface{}
errmap, ok := opts.Context.Value(errorMapKey{}).(map[string]interface{})
if ok && errmap != nil {
if rerr, ok = errmap[fmt.Sprintf("%d", hrsp.StatusCode)].(error); !ok {
rerr, ok = errmap["default"].(error)
}
}
if !ok || rerr == nil {
return errors.New("go.micro.client", string(buf), int32(hrsp.StatusCode))
}
if cerr := cf.Unmarshal(buf, rerr); cerr != nil {
return errors.InternalServerError("go.micro.client", cerr.Error())
}
if err, ok = rerr.(error); !ok {
err = &Error{rerr}
}
}
return err
}

283
util.go
View File

@ -3,70 +3,38 @@ package http
import ( import (
"context" "context"
"fmt" "fmt"
"io" "io/ioutil"
"net/http" "net/http"
"net/url"
"reflect" "reflect"
"strings" "strings"
"sync" "sync"
"go.unistack.org/micro/v4/client" "github.com/unistack-org/micro/v3/client"
"go.unistack.org/micro/v4/errors" "github.com/unistack-org/micro/v3/codec"
"go.unistack.org/micro/v4/logger" "github.com/unistack-org/micro/v3/errors"
"go.unistack.org/micro/v4/metadata" rutil "github.com/unistack-org/micro/v3/util/reflect"
rutil "go.unistack.org/micro/v4/util/reflect" util "github.com/unistack-org/micro/v3/util/router"
) )
var ( var (
templateCache = make(map[string][]string) templateCache = make(map[string]util.Template)
mu sync.RWMutex mu sync.RWMutex
) )
// Error struct holds error func newPathRequest(path string, method string, body string, msg interface{}, tags []string) (string, interface{}, error) {
type Error struct {
err interface{}
}
// Error func for error interface
func (err *Error) Error() string {
return fmt.Sprintf("%v", err.err)
}
func GetError(err error) interface{} {
if rerr, ok := err.(*Error); ok {
return rerr.err
}
return err
}
func newPathRequest(path string, method string, body string, msg interface{}, tags []string, parameters map[string]map[string]string) (string, interface{}, error) {
// parse via https://github.com/googleapis/googleapis/blob/master/google/api/http.proto definition // parse via https://github.com/googleapis/googleapis/blob/master/google/api/http.proto definition
tpl, err := newTemplate(path) tpl, err := newTemplate(path)
if err != nil { if err != nil {
return "", nil, err return "", nil, err
} }
if len(tpl) > 0 && msg == nil { if len(tpl.Fields) > 0 && msg == nil {
return "", nil, fmt.Errorf("nil message but path params requested: %v", path) return "", nil, fmt.Errorf("nil message but path params requested: %v", path)
} }
fieldsmapskip := make(map[string]struct{}) fieldsmap := make(map[string]string, len(tpl.Fields))
fieldsmap := make(map[string]string, len(tpl)) for _, v := range tpl.Fields {
for _, v := range tpl { fieldsmap[v] = ""
var vs, ve int
for i := 0; i < len(v); i++ {
switch v[i] {
case '{':
vs = i + 1
case '}':
ve = i
}
if ve != 0 {
fieldsmap[v[vs:ve]] = ""
vs = 0
ve = 0
}
}
} }
nmsg, err := rutil.Zero(msg) nmsg, err := rutil.Zero(msg)
@ -85,7 +53,7 @@ func newPathRequest(path string, method string, body string, msg interface{}, ta
tnmsg = tnmsg.Elem() tnmsg = tnmsg.Elem()
} }
values := url.Values{} values := make(map[string]string)
// copy cycle // copy cycle
for i := 0; i < tmsg.NumField(); i++ { for i := 0; i < tmsg.NumField(); i++ {
val := tmsg.Field(i) val := tmsg.Field(i)
@ -93,15 +61,7 @@ func newPathRequest(path string, method string, body string, msg interface{}, ta
continue continue
} }
fld := tmsg.Type().Field(i) fld := tmsg.Type().Field(i)
// Skip unexported fields.
if fld.PkgPath != "" {
continue
}
/* check for empty PkgPath can be replaced with new method IsExported
if !fld.IsExported() {
continue
}
*/
t := &tag{} t := &tag{}
for _, tn := range tags { for _, tn := range tags {
ts, ok := fld.Tag.Lookup(tn) ts, ok := fld.Tag.Lookup(tn)
@ -113,129 +73,67 @@ func newPathRequest(path string, method string, body string, msg interface{}, ta
// special // special
switch tn { switch tn {
case "protobuf": // special case "protobuf": // special
for _, p := range tp { t = &tag{key: tn, name: tp[3][5:], opts: append(tp[:3], tp[4:]...)}
if idx := strings.Index(p, "name="); idx > 0 {
t = &tag{key: tn, name: p[idx:]}
}
}
default: default:
t = &tag{key: tn, name: tp[0]} t = &tag{key: tn, name: tp[0], opts: tp[1:]}
} }
if t.name != "" { if t.name != "" {
break break
} }
} }
cname := t.name if t.name == "" {
if cname == "" {
cname = fld.Name
// fallback to lowercase // fallback to lowercase
t.name = strings.ToLower(fld.Name) t.name = strings.ToLower(fld.Name)
} }
if _, ok := parameters["header"][cname]; ok {
continue
}
if _, ok := parameters["cookie"][cname]; ok {
continue
}
if !val.IsValid() || val.IsZero() {
continue
}
// nolint: gocritic, nestif
if _, ok := fieldsmap[t.name]; ok { if _, ok := fieldsmap[t.name]; ok {
switch val.Type().Kind() { fieldsmap[t.name] = fmt.Sprintf("%v", val.Interface())
case reflect.Slice:
for idx := 0; idx < val.Len(); idx++ {
values.Add(t.name, getParam(val.Index(idx)))
}
fieldsmapskip[t.name] = struct{}{}
default:
fieldsmap[t.name] = getParam(val)
}
} else if (body == "*" || body == t.name) && method != http.MethodGet { } else if (body == "*" || body == t.name) && method != http.MethodGet {
if tnmsg.Field(i).CanSet() {
tnmsg.Field(i).Set(val) tnmsg.Field(i).Set(val)
}
} else { } else {
if val.Type().Kind() == reflect.Slice { values[t.name] = fmt.Sprintf("%v", val.Interface())
for idx := 0; idx < val.Len(); idx++ {
values.Add(t.name, getParam(val.Index(idx)))
}
} else {
values.Add(t.name, getParam(val))
}
} }
} }
// check not filled stuff // check not filled stuff
for k, v := range fieldsmap { for k, v := range fieldsmap {
_, ok := fieldsmapskip[k] if v == "" {
if !ok && v == "" {
return "", nil, fmt.Errorf("path param %s not filled", k) return "", nil, fmt.Errorf("path param %s not filled", k)
} }
} }
var b strings.Builder var b strings.Builder
for _, fld := range tpl.Pool {
for _, fld := range tpl {
_, _ = b.WriteRune('/') _, _ = b.WriteRune('/')
// nolint: nestif if v, ok := fieldsmap[fld]; ok {
var vs, ve, vf int _, _ = b.WriteString(v)
var pholder bool
for i := 0; i < len(fld); i++ {
switch fld[i] {
case '{':
vs = i + 1
case '}':
ve = i
}
// nolint: nestif
if vs > 0 && ve != 0 {
if vm, ok := fieldsmap[fld[vs:ve]]; ok {
if vm != "" {
_, _ = b.WriteString(fld[vf : vs-1])
_, _ = b.WriteString(vm)
vf = ve + 1
}
} else { } else {
_, _ = b.WriteString(fld) _, _ = b.WriteString(fld)
} }
vs = 0
ve = 0
pholder = true
}
}
if !pholder {
_, _ = b.WriteString(fld)
}
} }
if len(values) > 0 { idx := 0
for k, v := range values {
if idx == 0 {
_, _ = b.WriteRune('?') _, _ = b.WriteRune('?')
_, _ = b.WriteString(values.Encode()) } else {
_, _ = b.WriteRune('&')
}
_, _ = b.WriteString(k)
_, _ = b.WriteRune('=')
_, _ = b.WriteString(v)
idx++
} }
if rutil.IsZero(nmsg) && !isEmptyStruct(nmsg) { if rutil.IsZero(nmsg) {
return b.String(), nil, nil return b.String(), nil, nil
} }
return b.String(), nmsg, nil return b.String(), nmsg, nil
} }
func isEmptyStruct(v interface{}) bool { func newTemplate(path string) (util.Template, error) {
val := reflect.ValueOf(v)
if val.Kind() == reflect.Ptr {
val = val.Elem()
}
return val.Kind() == reflect.Struct && val.NumField() == 0
}
func newTemplate(path string) ([]string, error) {
if len(path) == 0 || path[0] != '/' {
return nil, fmt.Errorf("path must starts with /")
}
mu.RLock() mu.RLock()
tpl, ok := templateCache[path] tpl, ok := templateCache[path]
if ok { if ok {
@ -244,7 +142,12 @@ func newTemplate(path string) ([]string, error) {
} }
mu.RUnlock() mu.RUnlock()
tpl = strings.Split(path[1:], "/") rule, err := util.Parse(path)
if err != nil {
return tpl, err
}
tpl = rule.Compile()
mu.Lock() mu.Lock()
templateCache[path] = tpl templateCache[path] = tpl
mu.Unlock() mu.Unlock()
@ -252,113 +155,43 @@ func newTemplate(path string) ([]string, error) {
return tpl, nil return tpl, nil
} }
func (h *httpClient) parseRsp(ctx context.Context, hrsp *http.Response, rsp interface{}, opts client.CallOptions) error { func parseRsp(ctx context.Context, hrsp *http.Response, cf codec.Codec, rsp interface{}, opts client.CallOptions) error {
var err error b, err := ioutil.ReadAll(hrsp.Body)
var buf []byte
// fast path return
if hrsp.StatusCode == http.StatusNoContent {
return nil
}
if opts.ResponseMetadata != nil {
*opts.ResponseMetadata = metadata.New(len(hrsp.Header))
for k, v := range hrsp.Header {
opts.ResponseMetadata.Set(k, strings.Join(v, ","))
}
}
select {
case <-ctx.Done():
err = ctx.Err()
default:
ct := DefaultContentType
if htype := hrsp.Header.Get("Content-Type"); htype != "" {
ct = htype
}
if hrsp.Body != nil {
buf, err = io.ReadAll(hrsp.Body)
if err != nil { if err != nil {
if h.opts.Logger.V(logger.ErrorLevel) { return errors.InternalServerError("go.micro.client", err.Error())
h.opts.Logger.Error(ctx, "failed to read body", err)
}
return errors.InternalServerError("go.micro.client", string(buf))
}
} }
cf, cerr := h.newCodec(ct)
if cerr != nil {
if h.opts.Logger.V(logger.DebugLevel) {
h.opts.Logger.Debug(ctx, fmt.Sprintf("response with %v unknown content-type %s %s", hrsp.Header, ct, buf))
}
return errors.InternalServerError("go.micro.client", cerr.Error())
}
if h.opts.Logger.V(logger.DebugLevel) {
h.opts.Logger.Debug(ctx, fmt.Sprintf("response %s with %v", buf, hrsp.Header))
}
// succeseful response
if hrsp.StatusCode < 400 { if hrsp.StatusCode < 400 {
if err = cf.Unmarshal(buf, rsp); err != nil { // unmarshal
if err := cf.Unmarshal(b, rsp); err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
return nil return nil
} }
// response with error
var rerr interface{}
errmap, ok := opts.Context.Value(errorMapKey{}).(map[string]interface{}) errmap, ok := opts.Context.Value(errorMapKey{}).(map[string]interface{})
if ok && errmap != nil { if !ok || errmap == nil {
rerr, ok = errmap[fmt.Sprintf("%d", hrsp.StatusCode)] // user not provide map of errors
// id: req.Service() ??
return errors.New("go.micro.client", string(b), int32(hrsp.StatusCode))
}
if err, ok = errmap[fmt.Sprintf("%d", hrsp.StatusCode)].(error); !ok {
err, ok = errmap["default"].(error)
}
if !ok { if !ok {
rerr, ok = errmap["default"] return errors.New("go.micro.client", string(b), int32(hrsp.StatusCode))
}
} }
if !ok || rerr == nil { if cerr := cf.Unmarshal(b, err); cerr != nil {
return errors.New("go.micro.client", string(buf), int32(hrsp.StatusCode))
}
if cerr := cf.Unmarshal(buf, rerr); cerr != nil {
return errors.InternalServerError("go.micro.client", cerr.Error()) return errors.InternalServerError("go.micro.client", cerr.Error())
} }
if err, ok = rerr.(error); !ok {
err = &Error{rerr}
}
}
return err return err
} }
type tag struct { type tag struct {
key string key string
name string name string
} opts []string
func getParam(val reflect.Value) string {
var v string
switch val.Kind() {
case reflect.Ptr:
switch reflect.Indirect(val).Type().String() {
case
"wrapperspb.BoolValue",
"wrapperspb.BytesValue",
"wrapperspb.DoubleValue",
"wrapperspb.FloatValue",
"wrapperspb.Int32Value", "wrapperspb.Int64Value",
"wrapperspb.StringValue",
"wrapperspb.UInt32Value", "wrapperspb.UInt64Value":
if eva := reflect.Indirect(val).FieldByName("Value"); eva.IsValid() {
v = getParam(eva)
}
}
default:
v = fmt.Sprintf("%v", val.Interface())
}
return v
} }

View File

@ -5,46 +5,28 @@ import (
"testing" "testing"
) )
func TestParsing(t *testing.T) { func TestTemplate(t *testing.T) {
type Message struct { tpl, err := newTemplate("/v1/{ClientID}/list")
IIN string `protobuf:"bytes,1,opt,name=iin,proto3" json:"iin"`
}
omsg := &Message{IIN: "5555"}
for _, m := range []string{"POST"} {
body := ""
path, nmsg, err := newPathRequest("/users/iin/{iin}/push-notifications", m, body, omsg, []string{"protobuf", "json"}, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
u, err := url.Parse(path) _ = tpl
if err != nil { // fmt.Printf("%#+v\n", tpl.Pool)
t.Fatal(err)
}
_ = nmsg
if u.Path != "/users/iin/5555/push-notifications" {
t.Fatalf("newPathRequest invalid path %s", u.Path)
}
if nmsg != nil {
t.Fatalf("new message must be nil: %v\n", nmsg)
}
}
} }
func TestNewPathRequest(t *testing.T) { func TestNewPathRequest(t *testing.T) {
type Message struct { type Message struct {
Name string `json:"name"` Name string `json:"name"`
Val1 string `protobuf:"bytes,1,opt,name=val1,proto3" json:"val1"` Val1 string `protobuf:"bytes,1,opt,name=val1,proto3" json:"val1"`
Val3 []string
Val2 int64 Val2 int64
Val3 []string
} }
omsg := &Message{Name: "test_name", Val1: "test_val1", Val2: 100, Val3: []string{"slice"}} omsg := &Message{Name: "test_name", Val1: "test_val1", Val2: 100, Val3: []string{"slice"}}
for _, m := range []string{"POST", "PUT", "PATCH", "GET", "DELETE"} { for _, m := range []string{"POST", "PUT", "PATCH", "GET", "DELETE"} {
body := "" body := ""
path, nmsg, err := newPathRequest("/v1/test", m, body, omsg, []string{"protobuf", "json"}, nil) path, nmsg, err := newPathRequest("/v1/test", m, body, omsg, []string{"protobuf", "json"})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -59,44 +41,12 @@ func TestNewPathRequest(t *testing.T) {
} }
} }
func TestNewPathRequestWithEmptyBody(t *testing.T) {
val := struct{}{}
cases := []string{
"",
"*",
"{}",
"nil",
`{"type": "invalid"}`,
}
for _, body := range cases {
for _, m := range []string{"POST", "PUT", "PATCH", "GET", "DELETE"} {
path, nmsg, err := newPathRequest("/v1/test", m, body, val, []string{"protobuf", "json"}, nil)
if err != nil {
t.Fatal(err)
}
if nmsg == nil {
t.Fatalf("invalid path: nil nmsg")
}
u, err := url.Parse(path)
if err != nil {
t.Fatal(err)
}
vals := u.Query()
if len(vals) != 0 {
t.Fatalf("invalid path: %v nmsg: %v", path, nmsg)
}
}
}
}
func TestNewPathVarRequest(t *testing.T) { func TestNewPathVarRequest(t *testing.T) {
type Message struct { type Message struct {
Name string `json:"name"` Name string `json:"name"`
Val1 string `protobuf:"bytes,1,opt,name=val1,proto3" json:"val1"` Val1 string `protobuf:"bytes,1,opt,name=val1,proto3" json:"val1"`
Val3 []string
Val2 int64 Val2 int64
Val3 []string
} }
omsg := &Message{Name: "test_name", Val1: "test_val1", Val2: 100, Val3: []string{"slice"}} omsg := &Message{Name: "test_name", Val1: "test_val1", Val2: 100, Val3: []string{"slice"}}
@ -106,7 +56,7 @@ func TestNewPathVarRequest(t *testing.T) {
if m != "GET" { if m != "GET" {
body = "*" body = "*"
} }
path, nmsg, err := newPathRequest("/v1/test/{val1}", m, body, omsg, []string{"protobuf", "json"}, nil) path, nmsg, err := newPathRequest("/v1/test/{val1}", m, body, omsg, []string{"protobuf", "json"})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }