Compare commits

..

4 Commits

Author SHA1 Message Date
603b855e2a Merge pull request 'dialopts fix' (#129) from dialoptsfix into v3
Some checks failed
build / test (push) Failing after 6s
build / lint (push) Failing after 5s
codeql / analyze (go) (push) Failing after 6s
Reviewed-on: #129
2023-06-23 12:24:12 +03:00
f2cfd562c3 dialopts fix
Some checks failed
autoapprove / autoapprove (pull_request) Failing after 6s
automerge / automerge (pull_request) Failing after 4s
dependabot-automerge / automerge (pull_request) Has been skipped
codeql / analyze (go) (pull_request) Failing after 6s
prbuild / test (pull_request) Failing after 6s
prbuild / lint (pull_request) Failing after 5s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-06-23 12:23:44 +03:00
781cf3d719 Merge pull request 'change DialOptions signature' (#128) from quicfix into v3
Reviewed-on: #128
2023-06-22 23:44:11 +03:00
1b65507fe5 change DialOptions signature
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-06-22 23:43:38 +03:00
23 changed files with 357 additions and 1221 deletions

View File

@@ -1,29 +0,0 @@
name: lint
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup deps
run: go get -v ./...
- name: run lint
uses: https://github.com/golangci/golangci-lint-action@v6
with:
version: 'latest'

View File

@@ -1,34 +0,0 @@
name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
push:
branches:
- master
- v3
- v4
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup deps
run: go get -v ./...
- name: run test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...

View File

@@ -1,53 +0,0 @@
name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
push:
branches:
- master
- v3
- v4
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: checkout tests
uses: actions/checkout@v4
with:
ref: master
filter: 'blob:none'
repository: unistack-org/micro-tests
path: micro-tests
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup go work
env:
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: |
go work init
go work use .
go work use micro-tests
- name: setup deps
env:
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: go get -v ./...
- name: run tests
env:
INTEGRATION_TESTS: yes
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: |
cd micro-tests
go test -mod readonly -v ./... || true

19
.github/dependabot.yml vendored Normal file
View File

@@ -0,0 +1,19 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
version: 2
updates:
# Maintain dependencies for GitHub Actions
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
# Maintain dependencies for Golang
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "daily"

20
.github/workflows/autoapprove.yml vendored Normal file
View File

@@ -0,0 +1,20 @@
name: "autoapprove"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
autoapprove:
runs-on: ubuntu-latest
steps:
- name: approve
uses: hmarr/auto-approve-action@v3
if: github.actor == 'vtolstov' || github.actor == 'dependabot[bot]'
id: approve
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

21
.github/workflows/automerge.yml vendored Normal file
View File

@@ -0,0 +1,21 @@
name: "automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'vtolstov'
steps:
- name: merge
id: merge
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

47
.github/workflows/build.yml vendored Normal file
View File

@@ -0,0 +1,47 @@
name: build
on:
push:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
- name: checkout
uses: actions/checkout@v3
- name: cache
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-
- name: deps
run: go get -v -t -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...
lint:
name: lint
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
- name: lint
uses: golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30
# Optional: working directory, useful for monorepos
# working-directory: somedir
# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0
# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true

78
.github/workflows/codeql-analysis.yml vendored Normal file
View File

@@ -0,0 +1,78 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "codeql"
on:
workflow_run:
workflows: ["prbuild"]
types:
- completed
push:
branches: [ master, v3 ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master, v3 ]
schedule:
- cron: '34 1 * * 0'
jobs:
analyze:
name: analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: checkout
uses: actions/checkout@v3
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
# Initializes the CodeQL tools for scanning.
- name: init
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: autobuild
uses: github/codeql-action/autobuild@v2
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: analyze
uses: github/codeql-action/analyze@v2

View File

@@ -0,0 +1,27 @@
name: "dependabot-automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'dependabot[bot]'
steps:
- name: metadata
id: metadata
uses: dependabot/fetch-metadata@v1.3.6
with:
github-token: "${{ secrets.TOKEN }}"
- name: merge
id: merge
if: ${{contains(steps.metadata.outputs.dependency-names, 'go.unistack.org')}}
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

47
.github/workflows/pr.yml vendored Normal file
View File

@@ -0,0 +1,47 @@
name: prbuild
on:
pull_request:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
- name: checkout
uses: actions/checkout@v3
- name: cache
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-
- name: deps
run: go get -v -t -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...
lint:
name: lint
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
- name: lint
uses: golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30
# Optional: working directory, useful for monorepos
# working-directory: somedir
# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0
# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true

24
.gitignore vendored
View File

@@ -1,24 +0,0 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
bin
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
# General
.DS_Store
.idea
.vscode

View File

@@ -1,5 +1,44 @@
run: run:
concurrency: 8 concurrency: 4
deadline: 5m deadline: 5m
issues-exit-code: 1 issues-exit-code: 1
tests: true tests: true
linters-settings:
govet:
check-shadowing: true
enable:
- fieldalignment
linters:
enable:
- govet
- deadcode
- errcheck
- govet
- ineffassign
- staticcheck
- structcheck
- typecheck
- unused
- varcheck
- bodyclose
- gci
- goconst
- gocritic
- gosimple
- gofmt
- gofumpt
- goimports
- golint
- gosec
- makezero
- misspell
- nakedret
- nestif
- nilerr
- noctx
- prealloc
- unconvert
- unparam
disable-all: false

13
go.mod
View File

@@ -1,8 +1,17 @@
module go.unistack.org/micro-client-grpc/v3 module go.unistack.org/micro-client-grpc/v3
go 1.16 go 1.20
require ( require (
go.unistack.org/micro/v3 v3.10.22 go.unistack.org/micro/v3 v3.10.22
google.golang.org/grpc v1.52.3 google.golang.org/grpc v1.55.0
)
require (
github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
google.golang.org/protobuf v1.30.0 // indirect
) )

1034
go.sum

File diff suppressed because it is too large Load Diff

33
grpc.go
View File

@@ -30,7 +30,7 @@ const (
) )
type grpcClient struct { type grpcClient struct {
pool *ConnPool pool *pool
opts client.Options opts client.Options
sync.RWMutex sync.RWMutex
init bool init bool
@@ -98,7 +98,6 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
maxRecvMsgSize := g.maxRecvMsgSizeValue() maxRecvMsgSize := g.maxRecvMsgSizeValue()
maxSendMsgSize := g.maxSendMsgSizeValue() maxSendMsgSize := g.maxSendMsgSizeValue()
cfgService := g.serviceConfig()
var grr error var grr error
@@ -117,9 +116,11 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize),
), ),
grpc.WithDefaultServiceConfig(cfgService),
} }
if opts := g.getGrpcDialOptions(g.opts.Context); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...)
}
if opts := g.getGrpcDialOptions(opts.Context); opts != nil { if opts := g.getGrpcDialOptions(opts.Context); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...) grpcDialOptions = append(grpcDialOptions, opts...)
} }
@@ -132,13 +133,13 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer)) grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer))
} }
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...) cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
defer func() { defer func() {
// defer execution of release // defer execution of release
g.pool.Put(cc, grr) g.pool.release(cc, grr)
}() }()
ch := make(chan error, 1) ch := make(chan error, 1)
@@ -220,7 +221,6 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
maxRecvMsgSize := g.maxRecvMsgSizeValue() maxRecvMsgSize := g.maxRecvMsgSizeValue()
maxSendMsgSize := g.maxSendMsgSizeValue() maxSendMsgSize := g.maxSendMsgSizeValue()
cfgService := g.serviceConfig()
grpcDialOptions := []grpc.DialOption{ grpcDialOptions := []grpc.DialOption{
g.secure(addr), g.secure(addr),
@@ -228,7 +228,6 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize),
), ),
grpc.WithDefaultServiceConfig(cfgService),
} }
if opts := g.getGrpcDialOptions(opts.Context); opts != nil { if opts := g.getGrpcDialOptions(opts.Context); opts != nil {
@@ -243,7 +242,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer)) grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer))
} }
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...) cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
@@ -276,7 +275,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
// cancel the context // cancel the context
cancel() cancel()
// release the connection // release the connection
g.pool.Put(cc, err) g.pool.release(cc, err)
// now return the error // now return the error
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
} }
@@ -304,7 +303,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
} }
// defer execution of release // defer execution of release
g.pool.Put(cc, err) g.pool.release(cc, err)
}, },
} }
@@ -373,17 +372,6 @@ func (g *grpcClient) newCodec(ct string) (codec.Codec, error) {
return nil, codec.ErrUnknownContentType return nil, codec.ErrUnknownContentType
} }
func (g *grpcClient) serviceConfig() string {
if g.opts.Context == nil {
return DefaultServiceConfig
}
v := g.opts.Context.Value(serviceConfigKey{})
if v == nil {
return DefaultServiceConfig
}
return v.(string)
}
func (g *grpcClient) Init(opts ...client.Option) error { func (g *grpcClient) Init(opts ...client.Option) error {
if len(opts) == 0 && g.init { if len(opts) == 0 && g.init {
return nil return nil
@@ -840,7 +828,8 @@ func NewClient(opts ...client.Option) client.Client {
opts: options, opts: options,
} }
rc.pool = NewConnPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams()) rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
c := client.Client(rc) c := client.Client(rc)
// wrap in reverse // wrap in reverse

View File

@@ -10,7 +10,7 @@ import (
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
) )
type ConnPool struct { type pool struct {
conns map[string]*streamsPool conns map[string]*streamsPool
size int size int
ttl int64 ttl int64
@@ -21,36 +21,36 @@ type ConnPool struct {
type streamsPool struct { type streamsPool struct {
// head of list // head of list
head *PoolConn head *poolConn
// busy conns list // busy conns list
busy *PoolConn busy *poolConn
// the siza of list // the siza of list
count int count int
// idle conn // idle conn
idle int idle int
} }
type PoolConn struct { type poolConn struct {
err error err error
*grpc.ClientConn *grpc.ClientConn
next *PoolConn next *poolConn
pool *ConnPool pool *pool
sp *streamsPool sp *streamsPool
pre *PoolConn pre *poolConn
addr string addr string
streams int streams int
created int64 created int64
in bool in bool
} }
func NewConnPool(size int, ttl time.Duration, idle int, ms int) *ConnPool { func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
if ms <= 0 { if ms <= 0 {
ms = 1 ms = 1
} }
if idle < 0 { if idle < 0 {
idle = 0 idle = 0
} }
return &ConnPool{ return &pool{
size: size, size: size,
ttl: int64(ttl.Seconds()), ttl: int64(ttl.Seconds()),
maxStreams: ms, maxStreams: ms,
@@ -59,7 +59,7 @@ func NewConnPool(size int, ttl time.Duration, idle int, ms int) *ConnPool {
} }
} }
func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*PoolConn, error) { func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) {
if strings.HasPrefix(addr, "http") { if strings.HasPrefix(addr, "http") {
addr = addr[strings.Index(addr, ":")+3:] addr = addr[strings.Index(addr, ":")+3:]
} }
@@ -67,7 +67,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
p.Lock() p.Lock()
sp, ok := p.conns[addr] sp, ok := p.conns[addr]
if !ok { if !ok {
sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0} sp = &streamsPool{head: &poolConn{}, busy: &poolConn{}, count: 0, idle: 0}
p.conns[addr] = sp p.conns[addr] = sp
} }
// while we have conns check streams and then return one // while we have conns check streams and then return one
@@ -135,7 +135,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false} conn = &poolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
// add conn to streams pool // add conn to streams pool
p.Lock() p.Lock()
@@ -147,7 +147,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
return conn, nil return conn, nil
} }
func (p *ConnPool) Put(conn *PoolConn, err error) { func (p *pool) release(conn *poolConn, err error) {
p.Lock() p.Lock()
p, sp, created := conn.pool, conn.sp, conn.created p, sp, created := conn.pool, conn.sp, conn.created
// try to add conn // try to add conn
@@ -182,11 +182,11 @@ func (p *ConnPool) Put(conn *PoolConn, err error) {
p.Unlock() p.Unlock()
} }
func (conn *PoolConn) Close() { func (conn *poolConn) Close() {
conn.pool.Put(conn, conn.err) conn.pool.release(conn, conn.err)
} }
func removeConn(conn *PoolConn) { func removeConn(conn *poolConn) {
if conn.pre != nil { if conn.pre != nil {
conn.pre.next = conn.next conn.pre.next = conn.next
} }
@@ -199,7 +199,7 @@ func removeConn(conn *PoolConn) {
conn.sp.count-- conn.sp.count--
} }
func addConnAfter(conn *PoolConn, after *PoolConn) { func addConnAfter(conn *poolConn, after *poolConn) {
conn.next = after.next conn.next = after.next
conn.pre = after conn.pre = after
if after.next != nil { if after.next != nil {

View File

@@ -10,7 +10,7 @@ import (
) )
var ( var (
// DefaultPoolMaxStreams maximum streams on a connection // DefaultPoolMaxStreams maximum streams on a connectioin
// (20) // (20)
DefaultPoolMaxStreams = 20 DefaultPoolMaxStreams = 20
@@ -25,9 +25,6 @@ var (
// DefaultMaxSendMsgSize maximum message that client can send // DefaultMaxSendMsgSize maximum message that client can send
// (4 MB). // (4 MB).
DefaultMaxSendMsgSize = 1024 * 1024 * 4 DefaultMaxSendMsgSize = 1024 * 1024 * 4
// DefaultServiceConfig enable load balancing
DefaultServiceConfig = `{"loadBalancingPolicy":"round_robin"}`
) )
type poolMaxStreams struct{} type poolMaxStreams struct{}
@@ -98,8 +95,8 @@ func MaxSendMsgSize(s int) client.Option {
type grpcDialOptions struct{} type grpcDialOptions struct{}
// DialOptions to be used to configure gRPC dial options // DialOptions to be used to configure gRPC dial options
func DialOptions(opts ...grpc.DialOption) client.CallOption { func DialOptions(opts ...grpc.DialOption) client.Option {
return func(o *client.CallOptions) { return func(o *client.Options) {
if o.Context == nil { if o.Context == nil {
o.Context = context.Background() o.Context = context.Background()
} }
@@ -118,14 +115,3 @@ func CallOptions(opts ...grpc.CallOption) client.CallOption {
o.Context = context.WithValue(o.Context, grpcCallOptions{}, opts) o.Context = context.WithValue(o.Context, grpcCallOptions{}, opts)
} }
} }
type serviceConfigKey struct{}
func ServiceConfig(str string) client.CallOption {
return func(options *client.CallOptions) {
if options.Context == nil {
options.Context = context.Background()
}
options.Context = context.WithValue(options.Context, serviceConfigKey{}, str)
}
}

View File

@@ -9,7 +9,7 @@ import (
) )
type response struct { type response struct {
conn *PoolConn conn *poolConn
stream grpc.ClientStream stream grpc.ClientStream
codec codec.Codec codec codec.Codec
} }

View File

@@ -17,7 +17,7 @@ type grpcStream struct {
request client.Request request client.Request
response client.Response response client.Response
close func(err error) close func(err error)
conn *PoolConn conn *poolConn
sync.RWMutex sync.RWMutex
closed bool closed bool
} }