Compare commits

...

4 Commits
v4.1.1 ... v4

Author SHA1 Message Date
372bb37779 [v4] hide access to internal mutex (#193)
All checks were successful
test / test (push) Successful in 2m51s
coverage / build (push) Successful in 46s
sync / sync (push) Successful in 10s
* changed embedded mutex to private field

* changed embedded mutex to private field
2025-05-25 01:14:55 +03:00
52bbed93c4 improve metadata handling
All checks were successful
coverage / build (push) Successful in 1m45s
test / test (push) Successful in 3m56s
sync / sync (push) Successful in 21s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-05-13 09:22:17 +03:00
0706a76f3c added commit hash check to avoid unnecessary repository cloning (#192)
All checks were successful
sync / sync (push) Successful in 8s
2025-05-05 19:25:48 +03:00
841f688ce6 fixup workflows
All checks were successful
sync / sync (push) Successful in 38s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-05-04 17:08:02 +03:00
8 changed files with 117 additions and 59 deletions

View File

@@ -1,6 +1,6 @@
--- ---
name: Bug report name: Bug report
about: For reporting bugs in go-micro about: For reporting bugs in micro
title: "[BUG]" title: "[BUG]"
labels: '' labels: ''
assignees: '' assignees: ''
@@ -16,9 +16,3 @@ assignees: ''
**How to reproduce the bug:** **How to reproduce the bug:**
If possible, please include a minimal code snippet here. If possible, please include a minimal code snippet here.
**Environment:**
Go Version: please paste `go version` output here
```
please paste `go env` output here
```

View File

@@ -1,6 +1,6 @@
--- ---
name: Feature request / Enhancement name: Feature request / Enhancement
about: If you have a need not served by go-micro about: If you have a need not served by micro
title: "[FEATURE]" title: "[FEATURE]"
labels: '' labels: ''
assignees: '' assignees: ''

View File

@@ -1,14 +1,8 @@
--- ---
name: Question name: Question
about: Ask a question about go-micro about: Ask a question about micro
title: '' title: ''
labels: '' labels: ''
assignees: '' assignees: ''
--- ---
Before asking, please check if your question has already been answered:
1. Check the documentation - https://micro.mu/docs/
2. Check the examples and plugins - https://github.com/micro/examples & https://github.com/micro/go-plugins
3. Search existing issues

28
.github/autoapprove.yml vendored Normal file
View File

@@ -0,0 +1,28 @@
name: "autoapprove"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
workflow_run:
workflows: ["prbuild"]
types:
- completed
permissions:
pull-requests: write
contents: write
jobs:
autoapprove:
runs-on: ubuntu-latest
steps:
- name: approve
run: [ "curl -o tea https://dl.gitea.com/tea/main/tea-main-linux-amd64",
"chmod +x ./tea",
"./tea login add --name unistack --token ${{ secrets.GITHUB_TOKEN }} --url https://git.unistack.org",
"./tea pr --repo ${{ github.event.repository.name }}"
]
if: github.actor == 'vtolstov'
id: approve
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -18,34 +18,76 @@ jobs:
echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" >> /root/.netrc echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" >> /root/.netrc
echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" >> /root/.netrc echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" >> /root/.netrc
- name: check master
id: check_master
run: |
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT
else
echo "sync_needed=false" >> $GITHUB_OUTPUT
fi
- name: sync master - name: sync master
if: steps.check_master.outputs.sync_needed == 'true'
run: | run: |
git clone --filter=blob:none --filter=tree:0 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo git clone --filter=blob:none --filter=tree:0 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo cd repo
git remote add --no-tags --fetch --track master upstream https://github.com/${GITHUB_REPOSITORY} git remote add --no-tags --fetch --track master upstream https://github.com/${GITHUB_REPOSITORY}
git merge upstream/master git pull --rebase upstream master
git push upstream master --progress git push upstream master --progress
git push origin master --progress git push origin master --progress
cd ../ cd ../
rm -rf repo rm -rf repo
- name: check v3
id: check_v3
run: |
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/v3 | cut -f1)
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/v3 | cut -f1)
echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT
else
echo "sync_needed=false" >> $GITHUB_OUTPUT
fi
- name: sync v3 - name: sync v3
if: steps.check_v3.outputs.sync_needed == 'true'
run: | run: |
git clone --filter=blob:none --filter=tree:0 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo git clone --filter=blob:none --filter=tree:0 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo cd repo
git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY} git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY}
git merge upstream/v3 git pull --rebase upstream v3
git push upstream v3 --progress git push upstream v3 --progress
git push origin v3 --progress git push origin v3 --progress
cd ../ cd ../
rm -rf repo rm -rf repo
- name: check v4
id: check_v4
run: |
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/v4 | cut -f1)
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/v4 | cut -f1)
echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT
else
echo "sync_needed=false" >> $GITHUB_OUTPUT
fi
- name: sync v4 - name: sync v4
if: steps.check_v4.outputs.sync_needed == 'true'
run: | run: |
git clone --filter=blob:none --filter=tree:0 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo git clone --filter=blob:none --filter=tree:0 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo cd repo
git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY} git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY}
git merge upstream/v4 git pull --rebase upstream v4
git push upstream v4 --progress git push upstream v4 --progress
git push origin v4 --progress git push origin v4 --progress
cd ../ cd ../

38
grpc.go
View File

@@ -38,8 +38,8 @@ type grpcClient struct {
funcStream client.FuncStream funcStream client.FuncStream
pool *ConnPool pool *ConnPool
opts client.Options opts client.Options
sync.RWMutex mu sync.RWMutex
init bool init bool
} }
// secure returns the dial option for whether its a secure or insecure connection // secure returns the dial option for whether its a secure or insecure connection
@@ -74,12 +74,12 @@ func (g *grpcClient) secure(addr string) grpc.DialOption {
} }
func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
var header map[string][]string var header gmetadata.MD
if md, ok := metadata.FromOutgoingContext(ctx); ok { if md, ok := metadata.FromOutgoingContext(ctx); ok {
header = metadata.Copy(md).AsHTTP2() header = metadata.Copy(md).AsHTTP2()
} else { } else {
header = make(map[string][]string, 2) header = make(gmetadata.MD, 4)
} }
if opts.RequestMetadata != nil { if opts.RequestMetadata != nil {
for k, v := range opts.RequestMetadata { for k, v := range opts.RequestMetadata {
@@ -87,9 +87,11 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
} }
} }
// set timeout in nanoseconds // set timeout in nanoseconds
header["grpc-timeout"] = append(header["grpc-timeout"], fmt.Sprintf("%dn", opts.RequestTimeout)) if opts.RequestTimeout > time.Duration(0) {
header["timeout"] = append(header["timeout"], fmt.Sprintf("%dn", opts.RequestTimeout)) header.Set("grpc-timeout", fmt.Sprintf("%dn", opts.RequestTimeout))
header["content-type"] = append(header["content-type"], req.ContentType()) header.Set("timeout", fmt.Sprintf("%dn", opts.RequestTimeout))
}
header.Set("content-type", req.ContentType())
ctx = gmetadata.NewOutgoingContext(ctx, header) ctx = gmetadata.NewOutgoingContext(ctx, header)
@@ -185,21 +187,19 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
} }
func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
var header map[string][]string var header gmetadata.MD
if md, ok := metadata.FromOutgoingContext(ctx); ok { if md, ok := metadata.FromOutgoingContext(ctx); ok {
header = metadata.Copy(md).AsHTTP2() header = metadata.Copy(md).AsHTTP2()
} else { } else {
header = make(map[string][]string) header = make(gmetadata.MD, 4)
} }
// set timeout in nanoseconds if opts.RequestTimeout > time.Duration(0) {
if opts.StreamTimeout > time.Duration(0) { header.Set("grpc-timeout", fmt.Sprintf("%dn", opts.RequestTimeout))
header["grpc-timeout"] = append(header["grpc-timeout"], fmt.Sprintf("%dn", opts.RequestTimeout)) header.Set("timeout", fmt.Sprintf("%dn", opts.RequestTimeout))
header["timeout"] = append(header["timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
} }
// set the content type for the request header.Set("content-type", req.ContentType())
header["content-type"] = append(header["content-type"], req.ContentType())
ctx = gmetadata.NewOutgoingContext(ctx, header) ctx = gmetadata.NewOutgoingContext(ctx, header)
@@ -361,8 +361,8 @@ func (g *grpcClient) maxSendMsgSizeValue() int {
} }
func (g *grpcClient) newCodec(ct string) (codec.Codec, error) { func (g *grpcClient) newCodec(ct string) (codec.Codec, error) {
g.RLock() g.mu.RLock()
defer g.RUnlock() defer g.mu.RUnlock()
if idx := strings.IndexRune(ct, ';'); idx >= 0 { if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx] ct = ct[:idx]
@@ -398,10 +398,10 @@ func (g *grpcClient) Init(opts ...client.Option) error {
// update pool configuration if the options changed // update pool configuration if the options changed
if size != g.opts.PoolSize || ttl != g.opts.PoolTTL { if size != g.opts.PoolSize || ttl != g.opts.PoolTTL {
g.pool.Lock() g.pool.mu.Lock()
g.pool.size = g.opts.PoolSize g.pool.size = g.opts.PoolSize
g.pool.ttl = int64(g.opts.PoolTTL.Seconds()) g.pool.ttl = int64(g.opts.PoolTTL.Seconds())
g.pool.Unlock() g.pool.mu.Unlock()
} }
g.funcCall = g.fnCall g.funcCall = g.fnCall

View File

@@ -16,7 +16,7 @@ type ConnPool struct {
ttl int64 ttl int64
maxStreams int maxStreams int
maxIdle int maxIdle int
sync.Mutex mu sync.Mutex
} }
type streamsPool struct { type streamsPool struct {
@@ -64,7 +64,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
addr = addr[strings.Index(addr, ":")+3:] addr = addr[strings.Index(addr, ":")+3:]
} }
now := time.Now().Unix() now := time.Now().Unix()
p.Lock() p.mu.Lock()
sp, ok := p.conns[addr] sp, ok := p.conns[addr]
if !ok { if !ok {
sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0} sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0}
@@ -125,10 +125,10 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
} }
// a good conn // a good conn
conn.streams++ conn.streams++
p.Unlock() p.mu.Unlock()
return conn, nil return conn, nil
} }
p.Unlock() p.mu.Unlock()
// nolint (TODO need fix) create new conn) // nolint (TODO need fix) create new conn)
cc, err := grpc.DialContext(ctx, addr, opts...) cc, err := grpc.DialContext(ctx, addr, opts...)
@@ -138,24 +138,24 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false} conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
// add conn to streams pool // add conn to streams pool
p.Lock() p.mu.Lock()
if sp.count < p.size { if sp.count < p.size {
addConnAfter(conn, sp.head) addConnAfter(conn, sp.head)
} }
p.Unlock() p.mu.Unlock()
return conn, nil return conn, nil
} }
func (p *ConnPool) Put(conn *PoolConn, err error) { func (p *ConnPool) Put(conn *PoolConn, err error) {
p.Lock() p.mu.Lock()
p, sp, created := conn.pool, conn.sp, conn.created p, sp, created := conn.pool, conn.sp, conn.created
// try to add conn // try to add conn
if !conn.in && sp.count < p.size { if !conn.in && sp.count < p.size {
addConnAfter(conn, sp.head) addConnAfter(conn, sp.head)
} }
if !conn.in { if !conn.in {
p.Unlock() p.mu.Unlock()
conn.ClientConn.Close() conn.ClientConn.Close()
return return
} }
@@ -173,13 +173,13 @@ func (p *ConnPool) Put(conn *PoolConn, err error) {
now := time.Now().Unix() now := time.Now().Unix()
if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl { if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl {
removeConn(conn) removeConn(conn)
p.Unlock() p.mu.Unlock()
conn.ClientConn.Close() conn.ClientConn.Close()
return return
} }
sp.idle++ sp.idle++
} }
p.Unlock() p.mu.Unlock()
} }
func (conn *PoolConn) Close() { func (conn *PoolConn) Close() {

View File

@@ -19,8 +19,8 @@ type grpcStream struct {
response client.Response response client.Response
close func(err error) close func(err error)
conn *PoolConn conn *PoolConn
sync.RWMutex mu sync.RWMutex
closed bool closed bool
} }
func (g *grpcStream) Context() context.Context { func (g *grpcStream) Context() context.Context {
@@ -88,15 +88,15 @@ func (g *grpcStream) RecvMsg(msg interface{}) (err error) {
} }
func (g *grpcStream) Error() error { func (g *grpcStream) Error() error {
g.RLock() g.mu.RLock()
defer g.RUnlock() defer g.mu.RUnlock()
return g.err return g.err
} }
func (g *grpcStream) setError(e error) { func (g *grpcStream) setError(e error) {
g.Lock() g.mu.Lock()
g.err = e g.err = e
g.Unlock() g.mu.Unlock()
} }
// Close the gRPC send stream // Close the gRPC send stream
@@ -105,8 +105,8 @@ func (g *grpcStream) setError(e error) {
// stream should still be able to receive after this function call // stream should still be able to receive after this function call
// TODO: should the conn be closed in another way? // TODO: should the conn be closed in another way?
func (g *grpcStream) Close() error { func (g *grpcStream) Close() error {
g.Lock() g.mu.Lock()
defer g.Unlock() defer g.mu.Unlock()
if g.closed { if g.closed {
return nil return nil
@@ -125,8 +125,8 @@ func (g *grpcStream) Close() error {
} }
func (g *grpcStream) CloseSend() error { func (g *grpcStream) CloseSend() error {
g.Lock() g.mu.Lock()
defer g.Unlock() defer g.mu.Unlock()
if g.closed { if g.closed {
return nil return nil