prepare v4 (#139)
move to v4 micro Co-authored-by: Василий Толстов <v.tolstov@unistack.org> Co-authored-by: Александр Толстихин <tolstihin1996@mail.ru> Reviewed-on: #139 Co-authored-by: Evstigneev Denis <danteevstigneev@yandex.ru> Co-committed-by: Evstigneev Denis <danteevstigneev@yandex.ru>
This commit is contained in:
		
							
								
								
									
										2
									
								
								codec.go
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								codec.go
									
									
									
									
									
								
							@@ -1,7 +1,7 @@
 | 
			
		||||
package grpc
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"go.unistack.org/micro/v3/codec"
 | 
			
		||||
	"go.unistack.org/micro/v4/codec"
 | 
			
		||||
	"google.golang.org/grpc"
 | 
			
		||||
	"google.golang.org/grpc/encoding"
 | 
			
		||||
)
 | 
			
		||||
 
 | 
			
		||||
@@ -2,9 +2,10 @@ package grpc
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"go.unistack.org/micro/v3/codec"
 | 
			
		||||
	gmetadata "google.golang.org/grpc/metadata"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"go.unistack.org/micro/v4/codec"
 | 
			
		||||
	gmetadata "google.golang.org/grpc/metadata"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type mockStream struct {
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										2
									
								
								error.go
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								error.go
									
									
									
									
									
								
							@@ -1,7 +1,7 @@
 | 
			
		||||
package grpc
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"go.unistack.org/micro/v3/errors"
 | 
			
		||||
	"go.unistack.org/micro/v4/errors"
 | 
			
		||||
	"google.golang.org/grpc/status"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										27
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										27
									
								
								go.mod
									
									
									
									
									
								
							@@ -1,19 +1,24 @@
 | 
			
		||||
module go.unistack.org/micro-client-grpc/v3
 | 
			
		||||
module go.unistack.org/micro-client-grpc/v4
 | 
			
		||||
 | 
			
		||||
go 1.21
 | 
			
		||||
go 1.23.0
 | 
			
		||||
 | 
			
		||||
toolchain go1.23.1
 | 
			
		||||
toolchain go1.23.3
 | 
			
		||||
 | 
			
		||||
require (
 | 
			
		||||
	go.unistack.org/micro/v3 v3.10.91
 | 
			
		||||
	google.golang.org/grpc v1.67.0
 | 
			
		||||
	go.unistack.org/micro/v4 v4.1.2
 | 
			
		||||
	google.golang.org/grpc v1.70.0
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
require (
 | 
			
		||||
	go.unistack.org/micro-proto/v3 v3.4.1 // indirect
 | 
			
		||||
	golang.org/x/net v0.29.0 // indirect
 | 
			
		||||
	golang.org/x/sys v0.25.0 // indirect
 | 
			
		||||
	golang.org/x/text v0.18.0 // indirect
 | 
			
		||||
	google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
 | 
			
		||||
	google.golang.org/protobuf v1.34.2 // indirect
 | 
			
		||||
	github.com/ash3in/uuidv8 v1.2.0 // indirect
 | 
			
		||||
	github.com/google/uuid v1.6.0 // indirect
 | 
			
		||||
	github.com/matoous/go-nanoid v1.5.1 // indirect
 | 
			
		||||
	github.com/spf13/cast v1.7.1 // indirect
 | 
			
		||||
	go.unistack.org/micro-proto/v4 v4.1.0 // indirect
 | 
			
		||||
	golang.org/x/net v0.35.0 // indirect
 | 
			
		||||
	golang.org/x/sys v0.30.0 // indirect
 | 
			
		||||
	golang.org/x/text v0.22.0 // indirect
 | 
			
		||||
	google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99 // indirect
 | 
			
		||||
	google.golang.org/protobuf v1.36.5 // indirect
 | 
			
		||||
	gopkg.in/yaml.v3 v3.0.1 // indirect
 | 
			
		||||
)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										71
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										71
									
								
								go.sum
									
									
									
									
									
								
							@@ -1,18 +1,57 @@
 | 
			
		||||
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
 | 
			
		||||
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
 | 
			
		||||
github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI=
 | 
			
		||||
github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4=
 | 
			
		||||
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
 | 
			
		||||
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
 | 
			
		||||
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
 | 
			
		||||
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
 | 
			
		||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
 | 
			
		||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
 | 
			
		||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
 | 
			
		||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
 | 
			
		||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
 | 
			
		||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
 | 
			
		||||
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
 | 
			
		||||
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
 | 
			
		||||
go.unistack.org/micro/v3 v3.10.91 h1:vuJY4tXwpqimwIkEJ3TozMYNVQQs+C5QMlQWPgSY/YM=
 | 
			
		||||
go.unistack.org/micro/v3 v3.10.91/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
 | 
			
		||||
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
 | 
			
		||||
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
 | 
			
		||||
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
 | 
			
		||||
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
 | 
			
		||||
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
 | 
			
		||||
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
 | 
			
		||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
 | 
			
		||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
 | 
			
		||||
google.golang.org/grpc v1.67.0 h1:IdH9y6PF5MPSdAntIcpjQ+tXO41pcQsfZV2RxtQgVcw=
 | 
			
		||||
google.golang.org/grpc v1.67.0/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
 | 
			
		||||
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
 | 
			
		||||
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
 | 
			
		||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
 | 
			
		||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 | 
			
		||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
 | 
			
		||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
 | 
			
		||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
 | 
			
		||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
 | 
			
		||||
github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4=
 | 
			
		||||
github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U=
 | 
			
		||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
 | 
			
		||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
 | 
			
		||||
github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
 | 
			
		||||
github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
 | 
			
		||||
go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U=
 | 
			
		||||
go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg=
 | 
			
		||||
go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M=
 | 
			
		||||
go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8=
 | 
			
		||||
go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4=
 | 
			
		||||
go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU=
 | 
			
		||||
go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU=
 | 
			
		||||
go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ=
 | 
			
		||||
go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM=
 | 
			
		||||
go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8=
 | 
			
		||||
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
 | 
			
		||||
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
 | 
			
		||||
go.unistack.org/micro/v4 v4.1.2 h1:9SOlPYyPNNFpg1A7BsvhDyQm3gysLH1AhWbDCp1hyoY=
 | 
			
		||||
go.unistack.org/micro/v4 v4.1.2/go.mod h1:lr3oYED8Ay1vjK68QqRw30QOtdk/ffpZqMFDasOUhKw=
 | 
			
		||||
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
 | 
			
		||||
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
 | 
			
		||||
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
 | 
			
		||||
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
 | 
			
		||||
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
 | 
			
		||||
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
 | 
			
		||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99 h1:ZSlhAUqC4r8TPzqLXQ0m3upBNZeF+Y8jQ3c4CR3Ujms=
 | 
			
		||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I=
 | 
			
		||||
google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ=
 | 
			
		||||
google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw=
 | 
			
		||||
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
 | 
			
		||||
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
 | 
			
		||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 | 
			
		||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
 | 
			
		||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
 | 
			
		||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 | 
			
		||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										157
									
								
								grpc.go
									
									
									
									
									
								
							
							
						
						
									
										157
									
								
								grpc.go
									
									
									
									
									
								
							@@ -6,22 +6,20 @@ import (
 | 
			
		||||
	"crypto/tls"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net"
 | 
			
		||||
	"os"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"go.unistack.org/micro/v3/broker"
 | 
			
		||||
	"go.unistack.org/micro/v3/client"
 | 
			
		||||
	"go.unistack.org/micro/v3/codec"
 | 
			
		||||
	"go.unistack.org/micro/v3/errors"
 | 
			
		||||
	"go.unistack.org/micro/v3/metadata"
 | 
			
		||||
	"go.unistack.org/micro/v3/options"
 | 
			
		||||
	"go.unistack.org/micro/v3/selector"
 | 
			
		||||
	"go.unistack.org/micro/v3/semconv"
 | 
			
		||||
	"go.unistack.org/micro/v3/tracer"
 | 
			
		||||
	"go.unistack.org/micro/v4/client"
 | 
			
		||||
	"go.unistack.org/micro/v4/codec"
 | 
			
		||||
	"go.unistack.org/micro/v4/errors"
 | 
			
		||||
	"go.unistack.org/micro/v4/metadata"
 | 
			
		||||
	"go.unistack.org/micro/v4/options"
 | 
			
		||||
	"go.unistack.org/micro/v4/selector"
 | 
			
		||||
	"go.unistack.org/micro/v4/semconv"
 | 
			
		||||
	"go.unistack.org/micro/v4/tracer"
 | 
			
		||||
	"google.golang.org/grpc"
 | 
			
		||||
	"google.golang.org/grpc/codes"
 | 
			
		||||
	"google.golang.org/grpc/credentials"
 | 
			
		||||
@@ -36,12 +34,10 @@ const (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type grpcClient struct {
 | 
			
		||||
	funcPublish      client.FuncPublish
 | 
			
		||||
	funcBatchPublish client.FuncBatchPublish
 | 
			
		||||
	funcCall         client.FuncCall
 | 
			
		||||
	funcStream       client.FuncStream
 | 
			
		||||
	pool             *ConnPool
 | 
			
		||||
	opts             client.Options
 | 
			
		||||
	funcCall   client.FuncCall
 | 
			
		||||
	funcStream client.FuncStream
 | 
			
		||||
	pool       *ConnPool
 | 
			
		||||
	opts       client.Options
 | 
			
		||||
	sync.RWMutex
 | 
			
		||||
	init bool
 | 
			
		||||
}
 | 
			
		||||
@@ -78,15 +74,12 @@ func (g *grpcClient) secure(addr string) grpc.DialOption {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
 | 
			
		||||
	var header map[string]string
 | 
			
		||||
	var header map[string][]string
 | 
			
		||||
 | 
			
		||||
	if md, ok := metadata.FromOutgoingContext(ctx); ok {
 | 
			
		||||
		header = make(map[string]string, len(md))
 | 
			
		||||
		for k, v := range md {
 | 
			
		||||
			header[strings.ToLower(k)] = v
 | 
			
		||||
		}
 | 
			
		||||
		header = metadata.Copy(md)
 | 
			
		||||
	} else {
 | 
			
		||||
		header = make(map[string]string, 2)
 | 
			
		||||
		header = make(map[string][]string, 2)
 | 
			
		||||
	}
 | 
			
		||||
	if opts.RequestMetadata != nil {
 | 
			
		||||
		for k, v := range opts.RequestMetadata {
 | 
			
		||||
@@ -94,12 +87,11 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	// set timeout in nanoseconds
 | 
			
		||||
	header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout)
 | 
			
		||||
	header["timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout)
 | 
			
		||||
	header["content-type"] = req.ContentType()
 | 
			
		||||
	header["grpc-timeout"] = append(header["grpc-timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
 | 
			
		||||
	header["timeout"] = append(header["timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
 | 
			
		||||
	header["content-type"] = append(header["content-type"], req.ContentType())
 | 
			
		||||
 | 
			
		||||
	md := gmetadata.New(header)
 | 
			
		||||
	ctx = gmetadata.NewOutgoingContext(ctx, md)
 | 
			
		||||
	ctx = gmetadata.NewOutgoingContext(ctx, header)
 | 
			
		||||
 | 
			
		||||
	cf, err := g.newCodec(req.ContentType())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -185,7 +177,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
 | 
			
		||||
	if opts.ResponseMetadata != nil {
 | 
			
		||||
		*opts.ResponseMetadata = metadata.New(gmd.Len())
 | 
			
		||||
		for k, v := range gmd {
 | 
			
		||||
			opts.ResponseMetadata.Set(k, strings.Join(v, ","))
 | 
			
		||||
			opts.ResponseMetadata.Append(k, v...)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -193,27 +185,23 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
 | 
			
		||||
	var header map[string]string
 | 
			
		||||
	var header map[string][]string
 | 
			
		||||
 | 
			
		||||
	if md, ok := metadata.FromOutgoingContext(ctx); ok {
 | 
			
		||||
		header = make(map[string]string, len(md))
 | 
			
		||||
		for k, v := range md {
 | 
			
		||||
			header[k] = v
 | 
			
		||||
		}
 | 
			
		||||
		header = metadata.Copy(md)
 | 
			
		||||
	} else {
 | 
			
		||||
		header = make(map[string]string)
 | 
			
		||||
		header = make(map[string][]string)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// set timeout in nanoseconds
 | 
			
		||||
	if opts.StreamTimeout > time.Duration(0) {
 | 
			
		||||
		header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout)
 | 
			
		||||
		header["timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout)
 | 
			
		||||
		header["grpc-timeout"] = append(header["grpc-timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
 | 
			
		||||
		header["timeout"] = append(header["timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
 | 
			
		||||
	}
 | 
			
		||||
	// set the content type for the request
 | 
			
		||||
	header["content-type"] = req.ContentType()
 | 
			
		||||
	header["content-type"] = append(header["content-type"], req.ContentType())
 | 
			
		||||
 | 
			
		||||
	md := gmetadata.New(header)
 | 
			
		||||
	ctx = gmetadata.NewOutgoingContext(ctx, md)
 | 
			
		||||
	ctx = gmetadata.NewOutgoingContext(ctx, header)
 | 
			
		||||
 | 
			
		||||
	cf, err := g.newCodec(req.ContentType())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -418,8 +406,6 @@ func (g *grpcClient) Init(opts ...client.Option) error {
 | 
			
		||||
 | 
			
		||||
	g.funcCall = g.fnCall
 | 
			
		||||
	g.funcStream = g.fnStream
 | 
			
		||||
	g.funcPublish = g.fnPublish
 | 
			
		||||
	g.funcBatchPublish = g.fnBatchPublish
 | 
			
		||||
 | 
			
		||||
	g.opts.Hooks.EachPrev(func(hook options.Hook) {
 | 
			
		||||
		switch h := hook.(type) {
 | 
			
		||||
@@ -427,12 +413,9 @@ func (g *grpcClient) Init(opts ...client.Option) error {
 | 
			
		||||
			g.funcCall = h(g.funcCall)
 | 
			
		||||
		case client.HookStream:
 | 
			
		||||
			g.funcStream = h(g.funcStream)
 | 
			
		||||
		case client.HookPublish:
 | 
			
		||||
			g.funcPublish = h(g.funcPublish)
 | 
			
		||||
		case client.HookBatchPublish:
 | 
			
		||||
			g.funcBatchPublish = h(g.funcBatchPublish)
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
	g.init = true
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
@@ -441,10 +424,6 @@ func (g *grpcClient) Options() client.Options {
 | 
			
		||||
	return g.opts
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
 | 
			
		||||
	return newGRPCEvent(topic, msg, g.opts.ContentType, opts...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
 | 
			
		||||
	return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...)
 | 
			
		||||
}
 | 
			
		||||
@@ -763,84 +742,6 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c
 | 
			
		||||
	return nil, grr
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcClient) BatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
 | 
			
		||||
	return g.funcBatchPublish(ctx, ps, opts...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcClient) fnBatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
 | 
			
		||||
	return g.publish(ctx, ps, opts...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
 | 
			
		||||
	return g.funcPublish(ctx, p, opts...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcClient) fnPublish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
 | 
			
		||||
	return g.publish(ctx, []client.Message{p}, opts...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
 | 
			
		||||
	var body []byte
 | 
			
		||||
 | 
			
		||||
	options := client.NewPublishOptions(opts...)
 | 
			
		||||
 | 
			
		||||
	// get proxy
 | 
			
		||||
	exchange := ""
 | 
			
		||||
	if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
 | 
			
		||||
		exchange = v
 | 
			
		||||
	}
 | 
			
		||||
	// get the exchange
 | 
			
		||||
	if len(options.Exchange) > 0 {
 | 
			
		||||
		exchange = options.Exchange
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	msgs := make([]*broker.Message, 0, len(ps))
 | 
			
		||||
 | 
			
		||||
	omd, ok := metadata.FromOutgoingContext(ctx)
 | 
			
		||||
	if !ok {
 | 
			
		||||
		omd = metadata.New(2)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, p := range ps {
 | 
			
		||||
		md := metadata.Copy(omd)
 | 
			
		||||
		topic := p.Topic()
 | 
			
		||||
		if len(exchange) > 0 {
 | 
			
		||||
			topic = exchange
 | 
			
		||||
		}
 | 
			
		||||
		md.Set(metadata.HeaderTopic, topic)
 | 
			
		||||
		iter := p.Metadata().Iterator()
 | 
			
		||||
		var k, v string
 | 
			
		||||
		for iter.Next(&k, &v) {
 | 
			
		||||
			md.Set(k, v)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		md[metadata.HeaderContentType] = p.ContentType()
 | 
			
		||||
 | 
			
		||||
		// passed in raw data
 | 
			
		||||
		if d, ok := p.Payload().(*codec.Frame); ok {
 | 
			
		||||
			body = d.Data
 | 
			
		||||
		} else {
 | 
			
		||||
			// use codec for payload
 | 
			
		||||
			cf, err := g.newCodec(p.ContentType())
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return errors.InternalServerError("go.micro.client", "%+v", err)
 | 
			
		||||
			}
 | 
			
		||||
			// set the body
 | 
			
		||||
			b, err := cf.Marshal(p.Payload())
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return errors.InternalServerError("go.micro.client", "%+v", err)
 | 
			
		||||
			}
 | 
			
		||||
			body = b
 | 
			
		||||
		}
 | 
			
		||||
		msgs = append(msgs, &broker.Message{Header: md, Body: body})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return g.opts.Broker.BatchPublish(ctx, msgs,
 | 
			
		||||
		broker.PublishContext(options.Context),
 | 
			
		||||
		broker.PublishBodyOnly(options.BodyOnly),
 | 
			
		||||
	)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcClient) String() string {
 | 
			
		||||
	return "grpc"
 | 
			
		||||
}
 | 
			
		||||
@@ -920,8 +821,6 @@ func NewClient(opts ...client.Option) client.Client {
 | 
			
		||||
 | 
			
		||||
	c.funcCall = c.fnCall
 | 
			
		||||
	c.funcStream = c.fnStream
 | 
			
		||||
	c.funcPublish = c.fnPublish
 | 
			
		||||
	c.funcBatchPublish = c.fnBatchPublish
 | 
			
		||||
 | 
			
		||||
	return c
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										44
									
								
								message.go
									
									
									
									
									
								
							
							
						
						
									
										44
									
								
								message.go
									
									
									
									
									
								
							@@ -1,44 +0,0 @@
 | 
			
		||||
package grpc
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"go.unistack.org/micro/v3/client"
 | 
			
		||||
	"go.unistack.org/micro/v3/metadata"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type grpcEvent struct {
 | 
			
		||||
	payload     interface{}
 | 
			
		||||
	topic       string
 | 
			
		||||
	contentType string
 | 
			
		||||
	opts        client.MessageOptions
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newGRPCEvent(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message {
 | 
			
		||||
	options := client.NewMessageOptions(opts...)
 | 
			
		||||
 | 
			
		||||
	if len(options.ContentType) > 0 {
 | 
			
		||||
		contentType = options.ContentType
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &grpcEvent{
 | 
			
		||||
		payload:     payload,
 | 
			
		||||
		topic:       topic,
 | 
			
		||||
		contentType: contentType,
 | 
			
		||||
		opts:        options,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcEvent) ContentType() string {
 | 
			
		||||
	return g.contentType
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcEvent) Topic() string {
 | 
			
		||||
	return g.topic
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcEvent) Payload() interface{} {
 | 
			
		||||
	return g.payload
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcEvent) Metadata() metadata.Metadata {
 | 
			
		||||
	return g.opts.Metadata
 | 
			
		||||
}
 | 
			
		||||
@@ -4,7 +4,7 @@ package grpc
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
 | 
			
		||||
	"go.unistack.org/micro/v3/client"
 | 
			
		||||
	"go.unistack.org/micro/v4/client"
 | 
			
		||||
	"google.golang.org/grpc"
 | 
			
		||||
	"google.golang.org/grpc/encoding"
 | 
			
		||||
)
 | 
			
		||||
 
 | 
			
		||||
@@ -4,8 +4,8 @@ import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"strings"
 | 
			
		||||
 | 
			
		||||
	"go.unistack.org/micro/v3/client"
 | 
			
		||||
	"go.unistack.org/micro/v3/codec"
 | 
			
		||||
	"go.unistack.org/micro/v4/client"
 | 
			
		||||
	"go.unistack.org/micro/v4/codec"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type grpcRequest struct {
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										23
									
								
								response.go
									
									
									
									
									
								
							
							
						
						
									
										23
									
								
								response.go
									
									
									
									
									
								
							@@ -1,10 +1,8 @@
 | 
			
		||||
package grpc
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"strings"
 | 
			
		||||
 | 
			
		||||
	"go.unistack.org/micro/v3/codec"
 | 
			
		||||
	"go.unistack.org/micro/v3/metadata"
 | 
			
		||||
	"go.unistack.org/micro/v4/codec"
 | 
			
		||||
	"go.unistack.org/micro/v4/metadata"
 | 
			
		||||
	"google.golang.org/grpc"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -25,14 +23,19 @@ func (r *response) Header() metadata.Metadata {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	md := metadata.New(len(meta))
 | 
			
		||||
	for k, v := range meta {
 | 
			
		||||
		md.Set(k, strings.Join(v, ","))
 | 
			
		||||
	}
 | 
			
		||||
	return md
 | 
			
		||||
 | 
			
		||||
	return metadata.Metadata(meta.Copy())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Read the undecoded response
 | 
			
		||||
func (r *response) Read() ([]byte, error) {
 | 
			
		||||
	return nil, nil
 | 
			
		||||
	f := &codec.Frame{}
 | 
			
		||||
	wrap := &wrapStream{r.stream}
 | 
			
		||||
 | 
			
		||||
	_, err := wrap.Read(f.Data)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return f.Data, nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user