Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
372bb37779 | |||
52bbed93c4 | |||
0706a76f3c | |||
841f688ce6 |
8
.github/ISSUE_TEMPLATE/bug_report.md
vendored
8
.github/ISSUE_TEMPLATE/bug_report.md
vendored
@@ -1,6 +1,6 @@
|
|||||||
---
|
---
|
||||||
name: Bug report
|
name: Bug report
|
||||||
about: For reporting bugs in go-micro
|
about: For reporting bugs in micro
|
||||||
title: "[BUG]"
|
title: "[BUG]"
|
||||||
labels: ''
|
labels: ''
|
||||||
assignees: ''
|
assignees: ''
|
||||||
@@ -16,9 +16,3 @@ assignees: ''
|
|||||||
**How to reproduce the bug:**
|
**How to reproduce the bug:**
|
||||||
|
|
||||||
If possible, please include a minimal code snippet here.
|
If possible, please include a minimal code snippet here.
|
||||||
|
|
||||||
**Environment:**
|
|
||||||
Go Version: please paste `go version` output here
|
|
||||||
```
|
|
||||||
please paste `go env` output here
|
|
||||||
```
|
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
---
|
---
|
||||||
name: Feature request / Enhancement
|
name: Feature request / Enhancement
|
||||||
about: If you have a need not served by go-micro
|
about: If you have a need not served by micro
|
||||||
title: "[FEATURE]"
|
title: "[FEATURE]"
|
||||||
labels: ''
|
labels: ''
|
||||||
assignees: ''
|
assignees: ''
|
||||||
|
8
.github/ISSUE_TEMPLATE/question.md
vendored
8
.github/ISSUE_TEMPLATE/question.md
vendored
@@ -1,14 +1,8 @@
|
|||||||
---
|
---
|
||||||
name: Question
|
name: Question
|
||||||
about: Ask a question about go-micro
|
about: Ask a question about micro
|
||||||
title: ''
|
title: ''
|
||||||
labels: ''
|
labels: ''
|
||||||
assignees: ''
|
assignees: ''
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
Before asking, please check if your question has already been answered:
|
|
||||||
|
|
||||||
1. Check the documentation - https://micro.mu/docs/
|
|
||||||
2. Check the examples and plugins - https://github.com/micro/examples & https://github.com/micro/go-plugins
|
|
||||||
3. Search existing issues
|
|
||||||
|
28
.github/autoapprove.yml
vendored
Normal file
28
.github/autoapprove.yml
vendored
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
name: "autoapprove"
|
||||||
|
|
||||||
|
on:
|
||||||
|
pull_request_target:
|
||||||
|
types: [assigned, opened, synchronize, reopened]
|
||||||
|
workflow_run:
|
||||||
|
workflows: ["prbuild"]
|
||||||
|
types:
|
||||||
|
- completed
|
||||||
|
|
||||||
|
permissions:
|
||||||
|
pull-requests: write
|
||||||
|
contents: write
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
autoapprove:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: approve
|
||||||
|
run: [ "curl -o tea https://dl.gitea.com/tea/main/tea-main-linux-amd64",
|
||||||
|
"chmod +x ./tea",
|
||||||
|
"./tea login add --name unistack --token ${{ secrets.GITHUB_TOKEN }} --url https://git.unistack.org",
|
||||||
|
"./tea pr --repo ${{ github.event.repository.name }}"
|
||||||
|
]
|
||||||
|
if: github.actor == 'vtolstov'
|
||||||
|
id: approve
|
||||||
|
with:
|
||||||
|
github-token: ${{ secrets.GITHUB_TOKEN }}
|
48
.github/workflows/job_sync.yml
vendored
48
.github/workflows/job_sync.yml
vendored
@@ -18,34 +18,76 @@ jobs:
|
|||||||
echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" >> /root/.netrc
|
echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" >> /root/.netrc
|
||||||
echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" >> /root/.netrc
|
echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" >> /root/.netrc
|
||||||
|
|
||||||
|
- name: check master
|
||||||
|
id: check_master
|
||||||
|
run: |
|
||||||
|
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
|
||||||
|
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
|
||||||
|
echo "src_hash=$src_hash"
|
||||||
|
echo "dst_hash=$dst_hash"
|
||||||
|
if [ "$src_hash" != "$dst_hash" ]; then
|
||||||
|
echo "sync_needed=true" >> $GITHUB_OUTPUT
|
||||||
|
else
|
||||||
|
echo "sync_needed=false" >> $GITHUB_OUTPUT
|
||||||
|
fi
|
||||||
|
|
||||||
- name: sync master
|
- name: sync master
|
||||||
|
if: steps.check_master.outputs.sync_needed == 'true'
|
||||||
run: |
|
run: |
|
||||||
git clone --filter=blob:none --filter=tree:0 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
git clone --filter=blob:none --filter=tree:0 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||||
cd repo
|
cd repo
|
||||||
git remote add --no-tags --fetch --track master upstream https://github.com/${GITHUB_REPOSITORY}
|
git remote add --no-tags --fetch --track master upstream https://github.com/${GITHUB_REPOSITORY}
|
||||||
git merge upstream/master
|
git pull --rebase upstream master
|
||||||
git push upstream master --progress
|
git push upstream master --progress
|
||||||
git push origin master --progress
|
git push origin master --progress
|
||||||
cd ../
|
cd ../
|
||||||
rm -rf repo
|
rm -rf repo
|
||||||
|
|
||||||
|
- name: check v3
|
||||||
|
id: check_v3
|
||||||
|
run: |
|
||||||
|
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/v3 | cut -f1)
|
||||||
|
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/v3 | cut -f1)
|
||||||
|
echo "src_hash=$src_hash"
|
||||||
|
echo "dst_hash=$dst_hash"
|
||||||
|
if [ "$src_hash" != "$dst_hash" ]; then
|
||||||
|
echo "sync_needed=true" >> $GITHUB_OUTPUT
|
||||||
|
else
|
||||||
|
echo "sync_needed=false" >> $GITHUB_OUTPUT
|
||||||
|
fi
|
||||||
|
|
||||||
- name: sync v3
|
- name: sync v3
|
||||||
|
if: steps.check_v3.outputs.sync_needed == 'true'
|
||||||
run: |
|
run: |
|
||||||
git clone --filter=blob:none --filter=tree:0 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
git clone --filter=blob:none --filter=tree:0 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||||
cd repo
|
cd repo
|
||||||
git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY}
|
git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY}
|
||||||
git merge upstream/v3
|
git pull --rebase upstream v3
|
||||||
git push upstream v3 --progress
|
git push upstream v3 --progress
|
||||||
git push origin v3 --progress
|
git push origin v3 --progress
|
||||||
cd ../
|
cd ../
|
||||||
rm -rf repo
|
rm -rf repo
|
||||||
|
|
||||||
|
- name: check v4
|
||||||
|
id: check_v4
|
||||||
|
run: |
|
||||||
|
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/v4 | cut -f1)
|
||||||
|
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/v4 | cut -f1)
|
||||||
|
echo "src_hash=$src_hash"
|
||||||
|
echo "dst_hash=$dst_hash"
|
||||||
|
if [ "$src_hash" != "$dst_hash" ]; then
|
||||||
|
echo "sync_needed=true" >> $GITHUB_OUTPUT
|
||||||
|
else
|
||||||
|
echo "sync_needed=false" >> $GITHUB_OUTPUT
|
||||||
|
fi
|
||||||
|
|
||||||
- name: sync v4
|
- name: sync v4
|
||||||
|
if: steps.check_v4.outputs.sync_needed == 'true'
|
||||||
run: |
|
run: |
|
||||||
git clone --filter=blob:none --filter=tree:0 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
git clone --filter=blob:none --filter=tree:0 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||||
cd repo
|
cd repo
|
||||||
git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY}
|
git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY}
|
||||||
git merge upstream/v4
|
git pull --rebase upstream v4
|
||||||
git push upstream v4 --progress
|
git push upstream v4 --progress
|
||||||
git push origin v4 --progress
|
git push origin v4 --progress
|
||||||
cd ../
|
cd ../
|
||||||
|
38
grpc.go
38
grpc.go
@@ -38,8 +38,8 @@ type grpcClient struct {
|
|||||||
funcStream client.FuncStream
|
funcStream client.FuncStream
|
||||||
pool *ConnPool
|
pool *ConnPool
|
||||||
opts client.Options
|
opts client.Options
|
||||||
sync.RWMutex
|
mu sync.RWMutex
|
||||||
init bool
|
init bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// secure returns the dial option for whether its a secure or insecure connection
|
// secure returns the dial option for whether its a secure or insecure connection
|
||||||
@@ -74,12 +74,12 @@ func (g *grpcClient) secure(addr string) grpc.DialOption {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||||
var header map[string][]string
|
var header gmetadata.MD
|
||||||
|
|
||||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||||
header = metadata.Copy(md).AsHTTP2()
|
header = metadata.Copy(md).AsHTTP2()
|
||||||
} else {
|
} else {
|
||||||
header = make(map[string][]string, 2)
|
header = make(gmetadata.MD, 4)
|
||||||
}
|
}
|
||||||
if opts.RequestMetadata != nil {
|
if opts.RequestMetadata != nil {
|
||||||
for k, v := range opts.RequestMetadata {
|
for k, v := range opts.RequestMetadata {
|
||||||
@@ -87,9 +87,11 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// set timeout in nanoseconds
|
// set timeout in nanoseconds
|
||||||
header["grpc-timeout"] = append(header["grpc-timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
|
if opts.RequestTimeout > time.Duration(0) {
|
||||||
header["timeout"] = append(header["timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
|
header.Set("grpc-timeout", fmt.Sprintf("%dn", opts.RequestTimeout))
|
||||||
header["content-type"] = append(header["content-type"], req.ContentType())
|
header.Set("timeout", fmt.Sprintf("%dn", opts.RequestTimeout))
|
||||||
|
}
|
||||||
|
header.Set("content-type", req.ContentType())
|
||||||
|
|
||||||
ctx = gmetadata.NewOutgoingContext(ctx, header)
|
ctx = gmetadata.NewOutgoingContext(ctx, header)
|
||||||
|
|
||||||
@@ -185,21 +187,19 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||||
var header map[string][]string
|
var header gmetadata.MD
|
||||||
|
|
||||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||||
header = metadata.Copy(md).AsHTTP2()
|
header = metadata.Copy(md).AsHTTP2()
|
||||||
} else {
|
} else {
|
||||||
header = make(map[string][]string)
|
header = make(gmetadata.MD, 4)
|
||||||
}
|
}
|
||||||
|
|
||||||
// set timeout in nanoseconds
|
if opts.RequestTimeout > time.Duration(0) {
|
||||||
if opts.StreamTimeout > time.Duration(0) {
|
header.Set("grpc-timeout", fmt.Sprintf("%dn", opts.RequestTimeout))
|
||||||
header["grpc-timeout"] = append(header["grpc-timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
|
header.Set("timeout", fmt.Sprintf("%dn", opts.RequestTimeout))
|
||||||
header["timeout"] = append(header["timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
|
|
||||||
}
|
}
|
||||||
// set the content type for the request
|
header.Set("content-type", req.ContentType())
|
||||||
header["content-type"] = append(header["content-type"], req.ContentType())
|
|
||||||
|
|
||||||
ctx = gmetadata.NewOutgoingContext(ctx, header)
|
ctx = gmetadata.NewOutgoingContext(ctx, header)
|
||||||
|
|
||||||
@@ -361,8 +361,8 @@ func (g *grpcClient) maxSendMsgSizeValue() int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcClient) newCodec(ct string) (codec.Codec, error) {
|
func (g *grpcClient) newCodec(ct string) (codec.Codec, error) {
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
defer g.RUnlock()
|
defer g.mu.RUnlock()
|
||||||
|
|
||||||
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
|
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
|
||||||
ct = ct[:idx]
|
ct = ct[:idx]
|
||||||
@@ -398,10 +398,10 @@ func (g *grpcClient) Init(opts ...client.Option) error {
|
|||||||
|
|
||||||
// update pool configuration if the options changed
|
// update pool configuration if the options changed
|
||||||
if size != g.opts.PoolSize || ttl != g.opts.PoolTTL {
|
if size != g.opts.PoolSize || ttl != g.opts.PoolTTL {
|
||||||
g.pool.Lock()
|
g.pool.mu.Lock()
|
||||||
g.pool.size = g.opts.PoolSize
|
g.pool.size = g.opts.PoolSize
|
||||||
g.pool.ttl = int64(g.opts.PoolTTL.Seconds())
|
g.pool.ttl = int64(g.opts.PoolTTL.Seconds())
|
||||||
g.pool.Unlock()
|
g.pool.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
g.funcCall = g.fnCall
|
g.funcCall = g.fnCall
|
||||||
|
20
grpc_pool.go
20
grpc_pool.go
@@ -16,7 +16,7 @@ type ConnPool struct {
|
|||||||
ttl int64
|
ttl int64
|
||||||
maxStreams int
|
maxStreams int
|
||||||
maxIdle int
|
maxIdle int
|
||||||
sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type streamsPool struct {
|
type streamsPool struct {
|
||||||
@@ -64,7 +64,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
|
|||||||
addr = addr[strings.Index(addr, ":")+3:]
|
addr = addr[strings.Index(addr, ":")+3:]
|
||||||
}
|
}
|
||||||
now := time.Now().Unix()
|
now := time.Now().Unix()
|
||||||
p.Lock()
|
p.mu.Lock()
|
||||||
sp, ok := p.conns[addr]
|
sp, ok := p.conns[addr]
|
||||||
if !ok {
|
if !ok {
|
||||||
sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0}
|
sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0}
|
||||||
@@ -125,10 +125,10 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
|
|||||||
}
|
}
|
||||||
// a good conn
|
// a good conn
|
||||||
conn.streams++
|
conn.streams++
|
||||||
p.Unlock()
|
p.mu.Unlock()
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
p.Unlock()
|
p.mu.Unlock()
|
||||||
|
|
||||||
// nolint (TODO need fix) create new conn)
|
// nolint (TODO need fix) create new conn)
|
||||||
cc, err := grpc.DialContext(ctx, addr, opts...)
|
cc, err := grpc.DialContext(ctx, addr, opts...)
|
||||||
@@ -138,24 +138,24 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
|
|||||||
conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
|
conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
|
||||||
|
|
||||||
// add conn to streams pool
|
// add conn to streams pool
|
||||||
p.Lock()
|
p.mu.Lock()
|
||||||
if sp.count < p.size {
|
if sp.count < p.size {
|
||||||
addConnAfter(conn, sp.head)
|
addConnAfter(conn, sp.head)
|
||||||
}
|
}
|
||||||
p.Unlock()
|
p.mu.Unlock()
|
||||||
|
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) Put(conn *PoolConn, err error) {
|
func (p *ConnPool) Put(conn *PoolConn, err error) {
|
||||||
p.Lock()
|
p.mu.Lock()
|
||||||
p, sp, created := conn.pool, conn.sp, conn.created
|
p, sp, created := conn.pool, conn.sp, conn.created
|
||||||
// try to add conn
|
// try to add conn
|
||||||
if !conn.in && sp.count < p.size {
|
if !conn.in && sp.count < p.size {
|
||||||
addConnAfter(conn, sp.head)
|
addConnAfter(conn, sp.head)
|
||||||
}
|
}
|
||||||
if !conn.in {
|
if !conn.in {
|
||||||
p.Unlock()
|
p.mu.Unlock()
|
||||||
conn.ClientConn.Close()
|
conn.ClientConn.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -173,13 +173,13 @@ func (p *ConnPool) Put(conn *PoolConn, err error) {
|
|||||||
now := time.Now().Unix()
|
now := time.Now().Unix()
|
||||||
if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl {
|
if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl {
|
||||||
removeConn(conn)
|
removeConn(conn)
|
||||||
p.Unlock()
|
p.mu.Unlock()
|
||||||
conn.ClientConn.Close()
|
conn.ClientConn.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
sp.idle++
|
sp.idle++
|
||||||
}
|
}
|
||||||
p.Unlock()
|
p.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *PoolConn) Close() {
|
func (conn *PoolConn) Close() {
|
||||||
|
20
stream.go
20
stream.go
@@ -19,8 +19,8 @@ type grpcStream struct {
|
|||||||
response client.Response
|
response client.Response
|
||||||
close func(err error)
|
close func(err error)
|
||||||
conn *PoolConn
|
conn *PoolConn
|
||||||
sync.RWMutex
|
mu sync.RWMutex
|
||||||
closed bool
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcStream) Context() context.Context {
|
func (g *grpcStream) Context() context.Context {
|
||||||
@@ -88,15 +88,15 @@ func (g *grpcStream) RecvMsg(msg interface{}) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcStream) Error() error {
|
func (g *grpcStream) Error() error {
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
defer g.RUnlock()
|
defer g.mu.RUnlock()
|
||||||
return g.err
|
return g.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcStream) setError(e error) {
|
func (g *grpcStream) setError(e error) {
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
g.err = e
|
g.err = e
|
||||||
g.Unlock()
|
g.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close the gRPC send stream
|
// Close the gRPC send stream
|
||||||
@@ -105,8 +105,8 @@ func (g *grpcStream) setError(e error) {
|
|||||||
// stream should still be able to receive after this function call
|
// stream should still be able to receive after this function call
|
||||||
// TODO: should the conn be closed in another way?
|
// TODO: should the conn be closed in another way?
|
||||||
func (g *grpcStream) Close() error {
|
func (g *grpcStream) Close() error {
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
defer g.Unlock()
|
defer g.mu.Unlock()
|
||||||
|
|
||||||
if g.closed {
|
if g.closed {
|
||||||
return nil
|
return nil
|
||||||
@@ -125,8 +125,8 @@ func (g *grpcStream) Close() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcStream) CloseSend() error {
|
func (g *grpcStream) CloseSend() error {
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
defer g.Unlock()
|
defer g.mu.Unlock()
|
||||||
|
|
||||||
if g.closed {
|
if g.closed {
|
||||||
return nil
|
return nil
|
||||||
|
Reference in New Issue
Block a user