Compare commits

..

2 Commits
v4 ... master

Author SHA1 Message Date
9526345cd6 Update workflows (#135)
All checks were successful
test / test (push) Successful in 1m55s
- Rename workflows
- pr -> job_lint
- build -> job_test
- update golangci

Co-authored-by: Aleksandr Tolstikhin <atolstikhin@mtsbank.ru>
Reviewed-on: #135
Co-authored-by: Александр Толстихин <tolstihin1996@mail.ru>
Co-committed-by: Александр Толстихин <tolstihin1996@mail.ru>
2024-12-11 00:31:34 +03:00
e3fe27105b Merge pull request 'add defaultConfigService' (#133) from devstigneev/micro-client-grpc:issue_132 into master
Some checks failed
build / test (push) Successful in 1m38s
build / lint (push) Successful in 9m26s
codeql / analyze (go) (push) Failing after 13m37s
Reviewed-on: #133
2024-02-29 17:07:00 +03:00
25 changed files with 1387 additions and 524 deletions

View File

@@ -1,6 +1,6 @@
--- ---
name: Bug report name: Bug report
about: For reporting bugs in micro about: For reporting bugs in go-micro
title: "[BUG]" title: "[BUG]"
labels: '' labels: ''
assignees: '' assignees: ''
@@ -16,3 +16,9 @@ assignees: ''
**How to reproduce the bug:** **How to reproduce the bug:**
If possible, please include a minimal code snippet here. If possible, please include a minimal code snippet here.
**Environment:**
Go Version: please paste `go version` output here
```
please paste `go env` output here
```

View File

@@ -1,6 +1,6 @@
--- ---
name: Feature request / Enhancement name: Feature request / Enhancement
about: If you have a need not served by micro about: If you have a need not served by go-micro
title: "[FEATURE]" title: "[FEATURE]"
labels: '' labels: ''
assignees: '' assignees: ''

View File

@@ -0,0 +1,14 @@
---
name: Question
about: Ask a question about go-micro
title: ''
labels: ''
assignees: ''
---
Before asking, please check if your question has already been answered:
1. Check the documentation - https://micro.mu/docs/
2. Check the examples and plugins - https://github.com/micro/examples & https://github.com/micro/go-plugins
3. Search existing issues

View File

@@ -3,10 +3,10 @@ name: lint
on: on:
pull_request: pull_request:
types: [opened, reopened, synchronize] types: [opened, reopened, synchronize]
branches: [ master, v3, v4 ] branches:
paths-ignore: - master
- '.github/**' - v3
- '.gitea/**' - v4
jobs: jobs:
lint: lint:
@@ -24,6 +24,6 @@ jobs:
- name: setup deps - name: setup deps
run: go get -v ./... run: go get -v ./...
- name: run lint - name: run lint
uses: golangci/golangci-lint-action@v6 uses: https://github.com/golangci/golangci-lint-action@v6
with: with:
version: 'latest' version: 'latest'

View File

@@ -3,12 +3,15 @@ name: test
on: on:
pull_request: pull_request:
types: [opened, reopened, synchronize] types: [opened, reopened, synchronize]
branches: [ master, v3, v4 ] branches:
- master
- v3
- v4
push: push:
branches: [ master, v3, v4 ] branches:
paths-ignore: - master
- '.github/**' - v3
- '.gitea/**' - v4
jobs: jobs:
test: test:

View File

@@ -3,12 +3,15 @@ name: test
on: on:
pull_request: pull_request:
types: [opened, reopened, synchronize] types: [opened, reopened, synchronize]
branches: [ master, v3, v4 ] branches:
- master
- v3
- v4
push: push:
branches: [ master, v3, v4 ] branches:
paths-ignore: - master
- '.github/**' - v3
- '.gitea/**' - v4
jobs: jobs:
test: test:
@@ -32,19 +35,19 @@ jobs:
go-version: 'stable' go-version: 'stable'
- name: setup go work - name: setup go work
env: env:
GOWORK: ${{ github.workspace }}/go.work GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: | run: |
go work init go work init
go work use . go work use .
go work use micro-tests go work use micro-tests
- name: setup deps - name: setup deps
env: env:
GOWORK: ${{ github.workspace }}/go.work GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: go get -v ./... run: go get -v ./...
- name: run tests - name: run tests
env: env:
INTEGRATION_TESTS: yes INTEGRATION_TESTS: yes
GOWORK: ${{ github.workspace }}/go.work GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: | run: |
cd micro-tests cd micro-tests
go test -mod readonly -v ./... || true go test -mod readonly -v ./... || true

View File

@@ -1,8 +0,0 @@
---
name: Question
about: Ask a question about micro
title: ''
labels: ''
assignees: ''
---

View File

@@ -1,28 +0,0 @@
name: "autoapprove"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
workflow_run:
workflows: ["prbuild"]
types:
- completed
permissions:
pull-requests: write
contents: write
jobs:
autoapprove:
runs-on: ubuntu-latest
steps:
- name: approve
run: [ "curl -o tea https://dl.gitea.com/tea/main/tea-main-linux-amd64",
"chmod +x ./tea",
"./tea login add --name unistack --token ${{ secrets.GITHUB_TOKEN }} --url https://git.unistack.org",
"./tea pr --repo ${{ github.event.repository.name }}"
]
if: github.actor == 'vtolstov'
id: approve
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -1,53 +0,0 @@
name: coverage
on:
push:
branches: [ main, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
pull_request:
branches: [ main, v3, v4 ]
jobs:
build:
if: github.server_url != 'https://github.com'
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: test coverage
run: |
go test -v -cover ./... -covermode=count -coverprofile coverage.out -coverpkg ./...
go tool cover -func coverage.out -o coverage.out
- name: coverage badge
uses: tj-actions/coverage-badge-go@v2
with:
green: 80
filename: coverage.out
- uses: stefanzweifel/git-auto-commit-action@v4
name: autocommit
with:
commit_message: Apply Code Coverage Badge
skip_fetch: false
skip_checkout: false
file_pattern: ./README.md
- name: push
if: steps.auto-commit-action.outputs.changes_detected == 'true'
uses: ad-m/github-push-action@master
with:
github_token: ${{ github.token }}
branch: ${{ github.ref }}

View File

@@ -1,94 +0,0 @@
name: sync
on:
schedule:
- cron: '*/5 * * * *'
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
jobs:
sync:
if: github.server_url != 'https://github.com'
runs-on: ubuntu-latest
steps:
- name: init
run: |
git config --global user.email "vtolstov <vtolstov@users.noreply.github.com>"
git config --global user.name "github-actions[bot]"
echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" >> /root/.netrc
echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" >> /root/.netrc
- name: check master
id: check_master
run: |
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT
else
echo "sync_needed=false" >> $GITHUB_OUTPUT
fi
- name: sync master
if: steps.check_master.outputs.sync_needed == 'true'
run: |
git clone --filter=blob:none --filter=tree:0 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track master upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream master
git push upstream master --progress
git push origin master --progress
cd ../
rm -rf repo
- name: check v3
id: check_v3
run: |
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/v3 | cut -f1)
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/v3 | cut -f1)
echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT
else
echo "sync_needed=false" >> $GITHUB_OUTPUT
fi
- name: sync v3
if: steps.check_v3.outputs.sync_needed == 'true'
run: |
git clone --filter=blob:none --filter=tree:0 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream v3
git push upstream v3 --progress
git push origin v3 --progress
cd ../
rm -rf repo
- name: check v4
id: check_v4
run: |
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/v4 | cut -f1)
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/v4 | cut -f1)
echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT
else
echo "sync_needed=false" >> $GITHUB_OUTPUT
fi
- name: sync v4
if: steps.check_v4.outputs.sync_needed == 'true'
run: |
git clone --filter=blob:none --filter=tree:0 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream v4
git push upstream v4 --progress
git push origin v4 --progress
cd ../
rm -rf repo

View File

@@ -1,5 +1,5 @@
run: run:
concurrency: 8 concurrency: 8
timeout: 5m deadline: 5m
issues-exit-code: 1 issues-exit-code: 1
tests: true tests: true

View File

@@ -1,4 +0,0 @@
# GRPC Client
![Coverage](https://img.shields.io/badge/Coverage-2.3%25-red)
This plugin is a grpc client for micro.

View File

@@ -1,7 +1,9 @@
package grpc package grpc
import ( import (
"go.unistack.org/micro/v4/codec" "io"
"go.unistack.org/micro/v3/codec"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
) )
@@ -63,3 +65,63 @@ func (w *wrapGrpcCodec) Unmarshal(d []byte, v interface{}, opts ...codec.Option)
} }
return w.Codec.Unmarshal(d, v) return w.Codec.Unmarshal(d, v)
} }
/*
type grpcCodec struct {
grpc.ServerStream
// headers
id string
target string
method string
endpoint string
c encoding.Codec
}
*/
func (w *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error {
/*
if m == nil {
m = codec.NewMessage(codec.Request)
}
if md, ok := metadata.FromIncomingContext(g.ServerStream.Context()); ok {
if m.Header == nil {
m.Header = meta.New(len(md))
}
for k, v := range md {
m.Header[k] = strings.Join(v, ",")
}
}
m.Id = g.id
m.Target = g.target
m.Method = g.method
m.Endpoint = g.endpoint
*/
return nil
}
func (w *wrapGrpcCodec) ReadBody(conn io.Reader, v interface{}) error {
// caller has requested a frame
if m, ok := v.(*codec.Frame); ok {
_, err := conn.Read(m.Data)
return err
}
return codec.ErrInvalidMessage
}
func (w *wrapGrpcCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error {
// if we don't have a body
if v != nil {
b, err := w.Marshal(v)
if err != nil {
return err
}
m.Body = b
}
// write the body using the framing codec
_, err := conn.Write(m.Body)
return err
}

View File

@@ -1,67 +0,0 @@
package grpc
import (
"context"
"testing"
"go.unistack.org/micro/v4/codec"
gmetadata "google.golang.org/grpc/metadata"
)
type mockStream struct {
msg any
}
func (m mockStream) Header() (gmetadata.MD, error) {
return nil, nil
}
func (m mockStream) Trailer() gmetadata.MD {
return nil
}
func (m mockStream) CloseSend() error {
return nil
}
func (m mockStream) Context() context.Context {
return nil
}
func (m *mockStream) SendMsg(msg any) error {
m.msg = msg
return nil
}
func (m *mockStream) RecvMsg(msg any) error {
c := msg.(*codec.Frame)
c.Data = m.msg.(*codec.Frame).Data
return nil
}
func Test_ReadWrap(t *testing.T) {
wp := wrapStream{
&mockStream{},
}
write, err := wp.Write([]byte("test_data"))
if err != nil {
t.Fatal(err)
}
if write != 9 {
t.Error("uncorrected number wrote bytes")
}
b := make([]byte, write)
read, err := wp.Read(b)
if err != nil {
t.Fatal(err)
}
if read != 9 || string(b) != "test_data" {
t.Error("uncorrected number wrote bytes or data")
}
}

View File

@@ -1,7 +1,7 @@
package grpc package grpc
import ( import (
"go.unistack.org/micro/v4/errors" "go.unistack.org/micro/v3/errors"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )

24
go.mod
View File

@@ -1,24 +1,8 @@
module go.unistack.org/micro-client-grpc/v4 module go.unistack.org/micro-client-grpc/v3
go 1.23.0 go 1.16
toolchain go1.23.3
require ( require (
go.unistack.org/micro/v4 v4.1.7 go.unistack.org/micro/v3 v3.10.22
google.golang.org/grpc v1.72.0 google.golang.org/grpc v1.52.3
)
require (
github.com/ash3in/uuidv8 v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/matoous/go-nanoid v1.5.1 // indirect
github.com/spf13/cast v1.7.1 // indirect
go.unistack.org/micro-proto/v4 v4.1.0 // indirect
golang.org/x/net v0.39.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.24.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250425173222-7b384671a197 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
) )

1083
go.sum

File diff suppressed because it is too large Load Diff

284
grpc.go
View File

@@ -1,32 +1,28 @@
// Package grpc provides a gRPC client for micro framework // Package grpc provides a gRPC client
package grpc package grpc // import "go.unistack.org/micro-client-grpc/v3"
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net" "net"
"os"
"reflect" "reflect"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"go.unistack.org/micro/v4/client" "go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v4/errors" "go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v4/options" "go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v4/selector" "go.unistack.org/micro/v3/selector"
"go.unistack.org/micro/v4/semconv"
"go.unistack.org/micro/v4/tracer"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
gmetadata "google.golang.org/grpc/metadata" gmetadata "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
) )
const ( const (
@@ -34,12 +30,10 @@ const (
) )
type grpcClient struct { type grpcClient struct {
funcCall client.FuncCall pool *ConnPool
funcStream client.FuncStream opts client.Options
pool *ConnPool sync.RWMutex
opts client.Options init bool
mu sync.RWMutex
init bool
} }
// secure returns the dial option for whether its a secure or insecure connection // secure returns the dial option for whether its a secure or insecure connection
@@ -74,30 +68,32 @@ func (g *grpcClient) secure(addr string) grpc.DialOption {
} }
func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
var header gmetadata.MD var header map[string]string
if md, ok := metadata.FromOutgoingContext(ctx); ok { if md, ok := metadata.FromOutgoingContext(ctx); ok {
header = metadata.Copy(md).AsHTTP2() header = make(map[string]string, len(md))
for k, v := range md {
header[strings.ToLower(k)] = v
}
} else { } else {
header = make(gmetadata.MD, 4) header = make(map[string]string, 2)
} }
if opts.RequestMetadata != nil { if opts.RequestMetadata != nil {
for k, v := range opts.RequestMetadata { for k, v := range opts.RequestMetadata {
header[strings.ToLower(k)] = v header[k] = v
} }
} }
// set timeout in nanoseconds // set timeout in nanoseconds
if opts.RequestTimeout > time.Duration(0) { header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout)
header.Set("grpc-timeout", fmt.Sprintf("%dn", opts.RequestTimeout)) header["timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout)
header.Set("timeout", fmt.Sprintf("%dn", opts.RequestTimeout)) header["content-type"] = req.ContentType()
}
header.Set("content-type", req.ContentType())
ctx = gmetadata.NewOutgoingContext(ctx, header) md := gmetadata.New(header)
ctx = gmetadata.NewOutgoingContext(ctx, md)
cf, err := g.newCodec(req.ContentType()) cf, err := g.newCodec(req.ContentType())
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", "%+v", err) return errors.InternalServerError("go.micro.client", err.Error())
} }
maxRecvMsgSize := g.maxRecvMsgSizeValue() maxRecvMsgSize := g.maxRecvMsgSizeValue()
@@ -124,9 +120,6 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
grpc.WithDefaultServiceConfig(cfgService), grpc.WithDefaultServiceConfig(cfgService),
} }
if opts := g.getGrpcDialOptions(g.opts.Context); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...)
}
if opts := g.getGrpcDialOptions(opts.Context); opts != nil { if opts := g.getGrpcDialOptions(opts.Context); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...) grpcDialOptions = append(grpcDialOptions, opts...)
} }
@@ -141,7 +134,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...) cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", "Error sending request: %v", err) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
defer func() { defer func() {
// defer execution of release // defer execution of release
@@ -179,7 +172,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
if opts.ResponseMetadata != nil { if opts.ResponseMetadata != nil {
*opts.ResponseMetadata = metadata.New(gmd.Len()) *opts.ResponseMetadata = metadata.New(gmd.Len())
for k, v := range gmd { for k, v := range gmd {
opts.ResponseMetadata.Append(k, v...) opts.ResponseMetadata.Set(k, strings.Join(v, ","))
} }
} }
@@ -187,25 +180,31 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
} }
func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
var header gmetadata.MD var header map[string]string
if md, ok := metadata.FromOutgoingContext(ctx); ok { if md, ok := metadata.FromOutgoingContext(ctx); ok {
header = metadata.Copy(md).AsHTTP2() header = make(map[string]string, len(md))
for k, v := range md {
header[k] = v
}
} else { } else {
header = make(gmetadata.MD, 4) header = make(map[string]string)
} }
if opts.RequestTimeout > time.Duration(0) { // set timeout in nanoseconds
header.Set("grpc-timeout", fmt.Sprintf("%dn", opts.RequestTimeout)) if opts.StreamTimeout > time.Duration(0) {
header.Set("timeout", fmt.Sprintf("%dn", opts.RequestTimeout)) header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout)
header["timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout)
} }
header.Set("content-type", req.ContentType()) // set the content type for the request
header["content-type"] = req.ContentType()
ctx = gmetadata.NewOutgoingContext(ctx, header) md := gmetadata.New(header)
ctx = gmetadata.NewOutgoingContext(ctx, md)
cf, err := g.newCodec(req.ContentType()) cf, err := g.newCodec(req.ContentType())
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", "%+v", err) return errors.InternalServerError("go.micro.client", err.Error())
} }
var dialCtx context.Context var dialCtx context.Context
@@ -246,7 +245,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...) cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", "Error sending request: %v", err) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
desc := &grpc.StreamDesc{ desc := &grpc.StreamDesc{
@@ -279,7 +278,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
// release the connection // release the connection
g.pool.Put(cc, err) g.pool.Put(cc, err)
// now return the error // now return the error
return errors.InternalServerError("go.micro.client", "Error creating stream: %v", err) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
} }
// set request codec // set request codec
@@ -361,8 +360,8 @@ func (g *grpcClient) maxSendMsgSizeValue() int {
} }
func (g *grpcClient) newCodec(ct string) (codec.Codec, error) { func (g *grpcClient) newCodec(ct string) (codec.Codec, error) {
g.mu.RLock() g.RLock()
defer g.mu.RUnlock() defer g.RUnlock()
if idx := strings.IndexRune(ct, ';'); idx >= 0 { if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx] ct = ct[:idx]
@@ -398,23 +397,31 @@ func (g *grpcClient) Init(opts ...client.Option) error {
// update pool configuration if the options changed // update pool configuration if the options changed
if size != g.opts.PoolSize || ttl != g.opts.PoolTTL { if size != g.opts.PoolSize || ttl != g.opts.PoolTTL {
g.pool.mu.Lock() g.pool.Lock()
g.pool.size = g.opts.PoolSize g.pool.size = g.opts.PoolSize
g.pool.ttl = int64(g.opts.PoolTTL.Seconds()) g.pool.ttl = int64(g.opts.PoolTTL.Seconds())
g.pool.mu.Unlock() g.pool.Unlock()
} }
g.funcCall = g.fnCall if err := g.opts.Broker.Init(); err != nil {
g.funcStream = g.fnStream return err
}
if err := g.opts.Tracer.Init(); err != nil {
return err
}
if err := g.opts.Router.Init(); err != nil {
return err
}
if err := g.opts.Logger.Init(); err != nil {
return err
}
if err := g.opts.Meter.Init(); err != nil {
return err
}
if err := g.opts.Transport.Init(); err != nil {
return err
}
g.opts.Hooks.EachPrev(func(hook options.Hook) {
switch h := hook.(type) {
case client.HookCall:
g.funcCall = h(g.funcCall)
case client.HookStream:
g.funcStream = h(g.funcStream)
}
})
g.init = true g.init = true
return nil return nil
@@ -424,6 +431,10 @@ func (g *grpcClient) Options() client.Options {
return g.opts return g.opts
} }
func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
return newGRPCEvent(topic, msg, g.opts.ContentType, opts...)
}
func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...) return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...)
} }
@@ -434,32 +445,6 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
} else if rsp == nil { } else if rsp == nil {
return errors.InternalServerError("go.micro.client", "rsp is nil") return errors.InternalServerError("go.micro.client", "rsp is nil")
} }
ts := time.Now()
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
var sp tracer.Span
ctx, sp = g.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels("endpoint", req.Endpoint()),
)
err := g.funcCall(ctx, req, rsp, opts...)
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
te := time.Since(ts)
g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
if me := errors.FromError(err); me == nil {
sp.Finish()
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
} else {
sp.SetStatus(tracer.SpanStatusError, err.Error())
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
}
return err
}
func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
// make a copy of call opts // make a copy of call opts
callOpts := g.opts.CallOptions callOpts := g.opts.CallOptions
@@ -491,6 +476,11 @@ func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interfa
// make copy of call method // make copy of call method
gcall := g.call gcall := g.call
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
gcall = callOpts.CallWrappers[i-1](gcall)
}
// use the router passed as a call option, or fallback to the rpc clients router // use the router passed as a call option, or fallback to the rpc clients router
if callOpts.Router == nil { if callOpts.Router == nil {
callOpts.Router = g.opts.Router callOpts.Router = g.opts.Router
@@ -512,7 +502,7 @@ func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interfa
// call backoff first. Someone may want an initial start delay // call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, req, i) t, err := callOpts.Backoff(ctx, req, i)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", "%+v", err) return errors.InternalServerError("go.micro.client", err.Error())
} }
// only sleep if greater than 0 // only sleep if greater than 0
@@ -527,7 +517,7 @@ func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interfa
// TODO apply any filtering here // TODO apply any filtering here
routes, err = g.opts.Lookup(ctx, req, callOpts) routes, err = g.opts.Lookup(ctx, req, callOpts)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", "%+v", err) return errors.InternalServerError("go.micro.client", err.Error())
} }
// balance the list of nodes // balance the list of nodes
@@ -590,31 +580,6 @@ func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interfa
} }
func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
ts := time.Now()
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
var sp tracer.Span
ctx, sp = g.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels("endpoint", req.Endpoint()),
)
stream, err := g.funcStream(ctx, req, opts...)
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
te := time.Since(ts)
g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
if me := status.Convert(err); me == nil {
sp.Finish()
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc()
} else {
sp.SetStatus(tracer.SpanStatusError, err.Error())
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code()))).Inc()
}
return stream, err
}
func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
// make a copy of call opts // make a copy of call opts
callOpts := g.opts.CallOptions callOpts := g.opts.CallOptions
for _, opt := range opts { for _, opt := range opts {
@@ -633,6 +598,11 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c
// make a copy of stream // make a copy of stream
gstream := g.stream gstream := g.stream
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
gstream = callOpts.CallWrappers[i-1](gstream)
}
// use the router passed as a call option, or fallback to the rpc clients router // use the router passed as a call option, or fallback to the rpc clients router
if callOpts.Router == nil { if callOpts.Router == nil {
callOpts.Router = g.opts.Router callOpts.Router = g.opts.Router
@@ -654,7 +624,7 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c
// call backoff first. Someone may want an initial start delay // call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, req, i) t, err := callOpts.Backoff(ctx, req, i)
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", "%+v", err) return nil, errors.InternalServerError("go.micro.client", err.Error())
} }
// only sleep if greater than 0 // only sleep if greater than 0
@@ -669,7 +639,7 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c
// TODO apply any filtering here // TODO apply any filtering here
routes, err = g.opts.Lookup(ctx, req, callOpts) routes, err = g.opts.Lookup(ctx, req, callOpts)
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", "%+v", err) return nil, errors.InternalServerError("go.micro.client", err.Error())
} }
// balance the list of nodes // balance the list of nodes
@@ -742,6 +712,71 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c
return nil, grr return nil, grr
} }
func (g *grpcClient) BatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
return g.publish(ctx, ps, opts...)
}
func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
return g.publish(ctx, []client.Message{p}, opts...)
}
func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
var body []byte
options := client.NewPublishOptions(opts...)
// get proxy
exchange := ""
if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
exchange = v
}
msgs := make([]*broker.Message, 0, len(ps))
omd, ok := metadata.FromOutgoingContext(ctx)
if !ok {
omd = metadata.New(2)
}
for _, p := range ps {
md := metadata.Copy(omd)
md[metadata.HeaderContentType] = p.ContentType()
// passed in raw data
if d, ok := p.Payload().(*codec.Frame); ok {
body = d.Data
} else {
// use codec for payload
cf, err := g.newCodec(p.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// set the body
b, err := cf.Marshal(p.Payload())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
body = b
}
topic := p.Topic()
if len(exchange) > 0 {
topic = exchange
}
for k, v := range p.Metadata() {
md.Set(k, v)
}
md.Set(metadata.HeaderTopic, topic)
msgs = append(msgs, &broker.Message{Header: md, Body: body})
}
return g.opts.Broker.BatchPublish(ctx, msgs,
broker.PublishContext(options.Context),
broker.PublishBodyOnly(options.BodyOnly),
)
}
func (g *grpcClient) String() string { func (g *grpcClient) String() string {
return "grpc" return "grpc"
} }
@@ -801,16 +836,22 @@ func NewClient(opts ...client.Option) client.Client {
options.ContentType = DefaultContentType options.ContentType = DefaultContentType
} }
c := &grpcClient{ rc := &grpcClient{
opts: options, opts: options,
} }
c.pool = NewConnPool(options.PoolSize, options.PoolTTL, c.poolMaxIdle(), c.poolMaxStreams()) rc.pool = NewConnPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
c := client.Client(rc)
if c.opts.Context != nil { // wrap in reverse
if codecs, ok := c.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && codecs != nil { for i := len(options.Wrappers); i > 0; i-- {
c = options.Wrappers[i-1](c)
}
if rc.opts.Context != nil {
if codecs, ok := rc.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && codecs != nil {
for k, v := range codecs { for k, v := range codecs {
c.opts.Codecs[k] = &wrapGrpcCodec{v} rc.opts.Codecs[k] = &wrapGrpcCodec{v}
} }
} }
} }
@@ -819,8 +860,5 @@ func NewClient(opts ...client.Option) client.Client {
encoding.RegisterCodec(&wrapMicroCodec{k}) encoding.RegisterCodec(&wrapMicroCodec{k})
} }
c.funcCall = c.fnCall
c.funcStream = c.fnStream
return c return c
} }

View File

@@ -16,7 +16,7 @@ type ConnPool struct {
ttl int64 ttl int64
maxStreams int maxStreams int
maxIdle int maxIdle int
mu sync.Mutex sync.Mutex
} }
type streamsPool struct { type streamsPool struct {
@@ -64,7 +64,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
addr = addr[strings.Index(addr, ":")+3:] addr = addr[strings.Index(addr, ":")+3:]
} }
now := time.Now().Unix() now := time.Now().Unix()
p.mu.Lock() p.Lock()
sp, ok := p.conns[addr] sp, ok := p.conns[addr]
if !ok { if !ok {
sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0} sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0}
@@ -125,12 +125,12 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
} }
// a good conn // a good conn
conn.streams++ conn.streams++
p.mu.Unlock() p.Unlock()
return conn, nil return conn, nil
} }
p.mu.Unlock() p.Unlock()
// nolint (TODO need fix) create new conn) // create new conn)
cc, err := grpc.DialContext(ctx, addr, opts...) cc, err := grpc.DialContext(ctx, addr, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -138,24 +138,24 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false} conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
// add conn to streams pool // add conn to streams pool
p.mu.Lock() p.Lock()
if sp.count < p.size { if sp.count < p.size {
addConnAfter(conn, sp.head) addConnAfter(conn, sp.head)
} }
p.mu.Unlock() p.Unlock()
return conn, nil return conn, nil
} }
func (p *ConnPool) Put(conn *PoolConn, err error) { func (p *ConnPool) Put(conn *PoolConn, err error) {
p.mu.Lock() p.Lock()
p, sp, created := conn.pool, conn.sp, conn.created p, sp, created := conn.pool, conn.sp, conn.created
// try to add conn // try to add conn
if !conn.in && sp.count < p.size { if !conn.in && sp.count < p.size {
addConnAfter(conn, sp.head) addConnAfter(conn, sp.head)
} }
if !conn.in { if !conn.in {
p.mu.Unlock() p.Unlock()
conn.ClientConn.Close() conn.ClientConn.Close()
return return
} }
@@ -173,13 +173,13 @@ func (p *ConnPool) Put(conn *PoolConn, err error) {
now := time.Now().Unix() now := time.Now().Unix()
if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl { if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl {
removeConn(conn) removeConn(conn)
p.mu.Unlock() p.Unlock()
conn.ClientConn.Close() conn.ClientConn.Close()
return return
} }
sp.idle++ sp.idle++
} }
p.mu.Unlock() p.Unlock()
} }
func (conn *PoolConn) Close() { func (conn *PoolConn) Close() {

44
message.go Normal file
View File

@@ -0,0 +1,44 @@
package grpc
import (
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/metadata"
)
type grpcEvent struct {
payload interface{}
topic string
contentType string
opts client.MessageOptions
}
func newGRPCEvent(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message {
options := client.NewMessageOptions(opts...)
if len(options.ContentType) > 0 {
contentType = options.ContentType
}
return &grpcEvent{
payload: payload,
topic: topic,
contentType: contentType,
opts: options,
}
}
func (g *grpcEvent) ContentType() string {
return g.contentType
}
func (g *grpcEvent) Topic() string {
return g.topic
}
func (g *grpcEvent) Payload() interface{} {
return g.payload
}
func (g *grpcEvent) Metadata() metadata.Metadata {
return g.opts.Metadata
}

View File

@@ -4,7 +4,7 @@ package grpc
import ( import (
"context" "context"
"go.unistack.org/micro/v4/client" "go.unistack.org/micro/v3/client"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
) )
@@ -98,8 +98,8 @@ func MaxSendMsgSize(s int) client.Option {
type grpcDialOptions struct{} type grpcDialOptions struct{}
// DialOptions to be used to configure gRPC dial options // DialOptions to be used to configure gRPC dial options
func DialOptions(opts ...grpc.DialOption) client.Option { func DialOptions(opts ...grpc.DialOption) client.CallOption {
return func(o *client.Options) { return func(o *client.CallOptions) {
if o.Context == nil { if o.Context == nil {
o.Context = context.Background() o.Context = context.Background()
} }

View File

@@ -4,8 +4,8 @@ import (
"fmt" "fmt"
"strings" "strings"
"go.unistack.org/micro/v4/client" "go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v3/codec"
) )
type grpcRequest struct { type grpcRequest struct {

View File

@@ -1,8 +1,10 @@
package grpc package grpc
import ( import (
"go.unistack.org/micro/v4/codec" "strings"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@@ -23,19 +25,18 @@ func (r *response) Header() metadata.Metadata {
if err != nil { if err != nil {
return nil return nil
} }
md := metadata.New(len(meta))
return metadata.Metadata(meta.Copy()) for k, v := range meta {
md.Set(k, strings.Join(v, ","))
}
return md
} }
// Read the undecoded response // Read the undecoded response
func (r *response) Read() ([]byte, error) { func (r *response) Read() ([]byte, error) {
f := &codec.Frame{} f := &codec.Frame{}
wrap := &wrapStream{r.stream} if err := r.codec.ReadBody(&wrapStream{r.stream}, f); err != nil {
_, err := wrap.Read(f.Data)
if err != nil {
return nil, err return nil, err
} }
return f.Data, nil return f.Data, nil
} }

View File

@@ -5,8 +5,7 @@ import (
"io" "io"
"sync" "sync"
"go.unistack.org/micro/v4/client" "go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v4/tracer"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@@ -19,8 +18,8 @@ type grpcStream struct {
response client.Response response client.Response
close func(err error) close func(err error)
conn *PoolConn conn *PoolConn
mu sync.RWMutex sync.RWMutex
closed bool closed bool
} }
func (g *grpcStream) Context() context.Context { func (g *grpcStream) Context() context.Context {
@@ -88,15 +87,15 @@ func (g *grpcStream) RecvMsg(msg interface{}) (err error) {
} }
func (g *grpcStream) Error() error { func (g *grpcStream) Error() error {
g.mu.RLock() g.RLock()
defer g.mu.RUnlock() defer g.RUnlock()
return g.err return g.err
} }
func (g *grpcStream) setError(e error) { func (g *grpcStream) setError(e error) {
g.mu.Lock() g.Lock()
g.err = e g.err = e
g.mu.Unlock() g.Unlock()
} }
// Close the gRPC send stream // Close the gRPC send stream
@@ -105,19 +104,13 @@ func (g *grpcStream) setError(e error) {
// stream should still be able to receive after this function call // stream should still be able to receive after this function call
// TODO: should the conn be closed in another way? // TODO: should the conn be closed in another way?
func (g *grpcStream) Close() error { func (g *grpcStream) Close() error {
g.mu.Lock() g.Lock()
defer g.mu.Unlock() defer g.Unlock()
if g.closed { if g.closed {
return nil return nil
} }
if sp, ok := tracer.SpanFromContext(g.context); ok && sp != nil {
if g.err != nil {
sp.SetStatus(tracer.SpanStatusError, g.err.Error())
}
sp.Finish()
}
// close the connection // close the connection
g.closed = true g.closed = true
g.close(g.err) g.close(g.err)
@@ -125,8 +118,8 @@ func (g *grpcStream) Close() error {
} }
func (g *grpcStream) CloseSend() error { func (g *grpcStream) CloseSend() error {
g.mu.Lock() g.Lock()
defer g.mu.Unlock() defer g.Unlock()
if g.closed { if g.closed {
return nil return nil