Compare commits

...

5 Commits

Author SHA1 Message Date
6856038abe add Path metadata
Some checks failed
build / test (push) Failing after 1m28s
build / lint (push) Failing after 2m38s
codeql / analyze (go) (push) Failing after 2m44s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-11-03 19:33:49 +03:00
786bbb7185 Merge pull request 'dont init twice tls listener' (#167) from tls into v3
Reviewed-on: #167
2023-06-12 18:30:41 +03:00
95207c9617 dont init twice tls listener
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-06-12 18:29:07 +03:00
d646deb468 Merge pull request 'check subscribe errors' (#164) from subscribeerr into v3
Reviewed-on: #164
2023-05-13 16:06:53 +03:00
468819f0a0 check subscribe errors
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-13 16:06:37 +03:00
3 changed files with 66 additions and 1070 deletions

18
go.mod
View File

@@ -1,11 +1,17 @@
module go.unistack.org/micro-server-grpc/v3
go 1.16
go 1.20
require (
github.com/golang/protobuf v1.5.2
go.unistack.org/micro/v3 v3.10.14
golang.org/x/net v0.5.0
google.golang.org/grpc v1.52.3
google.golang.org/protobuf v1.28.1
github.com/golang/protobuf v1.5.3
go.unistack.org/micro/v3 v3.10.31
golang.org/x/net v0.17.0
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
)
require (
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect
)

1045
go.sum

File diff suppressed because it is too large Load Diff

73
grpc.go
View File

@@ -26,7 +26,6 @@ import (
"golang.org/x/net/netutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding"
gmetadata "google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -140,10 +139,6 @@ func (g *Server) configure(opts ...server.Option) error {
grpc.UnknownServiceHandler(g.handler),
}
if creds := g.getCredentials(); creds != nil {
gopts = append(gopts, grpc.Creds(creds))
}
if opts := g.getGrpcOptions(); opts != nil {
gopts = append(opts, gopts...)
}
@@ -180,13 +175,6 @@ func (g *Server) getMaxMsgSize() int {
return s
}
func (g *Server) getCredentials() credentials.TransportCredentials {
if g.opts.TLSConfig != nil {
return credentials.NewTLS(g.opts.TLSConfig)
}
return nil
}
func (g *Server) getGrpcOptions() []grpc.ServerOption {
if g.opts.Context == nil {
return nil
@@ -246,6 +234,7 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error)
for k, v := range gmd {
md.Set(k, strings.Join(v, ", "))
}
md.Set("Path", fullMethod)
var td string
// timeout for server deadline
@@ -718,31 +707,6 @@ func (g *Server) Register() error {
g.Lock()
defer g.Unlock()
for sb := range g.subscribers {
handler := g.createSubHandler(sb, config)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
subCtx := config.Context
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
opts = append(opts, broker.SubscribeContext(subCtx))
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
}
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
if err != nil {
return err
}
g.subscribers[sb] = []broker.Subscriber{sub}
}
g.registered = true
g.rsvc = service
@@ -876,6 +840,10 @@ func (g *Server) Start() error {
}
}
if err = g.subscribe(); err != nil {
return err
}
// micro: go ts.Accept(s.accept)
go func() {
if err = g.srv.Serve(ts); err != nil {
@@ -987,6 +955,37 @@ func (g *Server) Start() error {
return nil
}
func (g *Server) subscribe() error {
config := g.opts
for sb := range g.subscribers {
handler := g.createSubHandler(sb, config)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
subCtx := config.Context
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
opts = append(opts, broker.SubscribeContext(subCtx))
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
}
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
if err != nil {
return err
}
g.subscribers[sb] = []broker.Subscriber{sub}
}
return nil
}
func (g *Server) Stop() error {
g.RLock()
if !g.started {