Compare commits
6 Commits
Author | SHA1 | Date | |
---|---|---|---|
46891c397f | |||
6856038abe | |||
786bbb7185 | |||
95207c9617 | |||
d646deb468 | |||
468819f0a0 |
18
go.mod
18
go.mod
@@ -1,11 +1,17 @@
|
|||||||
module go.unistack.org/micro-server-grpc/v3
|
module go.unistack.org/micro-server-grpc/v3
|
||||||
|
|
||||||
go 1.16
|
go 1.20
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/golang/protobuf v1.5.2
|
github.com/golang/protobuf v1.5.3
|
||||||
go.unistack.org/micro/v3 v3.10.14
|
go.unistack.org/micro/v3 v3.10.31
|
||||||
golang.org/x/net v0.5.0
|
golang.org/x/net v0.17.0
|
||||||
google.golang.org/grpc v1.52.3
|
google.golang.org/grpc v1.59.0
|
||||||
google.golang.org/protobuf v1.28.1
|
google.golang.org/protobuf v1.31.0
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
golang.org/x/sys v0.13.0 // indirect
|
||||||
|
golang.org/x/text v0.13.0 // indirect
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect
|
||||||
)
|
)
|
||||||
|
76
grpc.go
76
grpc.go
@@ -26,7 +26,6 @@ import (
|
|||||||
"golang.org/x/net/netutil"
|
"golang.org/x/net/netutil"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/credentials"
|
|
||||||
"google.golang.org/grpc/encoding"
|
"google.golang.org/grpc/encoding"
|
||||||
gmetadata "google.golang.org/grpc/metadata"
|
gmetadata "google.golang.org/grpc/metadata"
|
||||||
"google.golang.org/grpc/peer"
|
"google.golang.org/grpc/peer"
|
||||||
@@ -140,10 +139,6 @@ func (g *Server) configure(opts ...server.Option) error {
|
|||||||
grpc.UnknownServiceHandler(g.handler),
|
grpc.UnknownServiceHandler(g.handler),
|
||||||
}
|
}
|
||||||
|
|
||||||
if creds := g.getCredentials(); creds != nil {
|
|
||||||
gopts = append(gopts, grpc.Creds(creds))
|
|
||||||
}
|
|
||||||
|
|
||||||
if opts := g.getGrpcOptions(); opts != nil {
|
if opts := g.getGrpcOptions(); opts != nil {
|
||||||
gopts = append(opts, gopts...)
|
gopts = append(opts, gopts...)
|
||||||
}
|
}
|
||||||
@@ -180,13 +175,6 @@ func (g *Server) getMaxMsgSize() int {
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Server) getCredentials() credentials.TransportCredentials {
|
|
||||||
if g.opts.TLSConfig != nil {
|
|
||||||
return credentials.NewTLS(g.opts.TLSConfig)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Server) getGrpcOptions() []grpc.ServerOption {
|
func (g *Server) getGrpcOptions() []grpc.ServerOption {
|
||||||
if g.opts.Context == nil {
|
if g.opts.Context == nil {
|
||||||
return nil
|
return nil
|
||||||
@@ -246,6 +234,10 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error)
|
|||||||
for k, v := range gmd {
|
for k, v := range gmd {
|
||||||
md.Set(k, strings.Join(v, ", "))
|
md.Set(k, strings.Join(v, ", "))
|
||||||
}
|
}
|
||||||
|
md.Set("Path", fullMethod)
|
||||||
|
md.Set("Micro-Server", "grpc")
|
||||||
|
md.Set(metadata.HeaderEndpoint, methodName)
|
||||||
|
md.Set(metadata.HeaderService, serviceName)
|
||||||
|
|
||||||
var td string
|
var td string
|
||||||
// timeout for server deadline
|
// timeout for server deadline
|
||||||
@@ -718,31 +710,6 @@ func (g *Server) Register() error {
|
|||||||
g.Lock()
|
g.Lock()
|
||||||
defer g.Unlock()
|
defer g.Unlock()
|
||||||
|
|
||||||
for sb := range g.subscribers {
|
|
||||||
handler := g.createSubHandler(sb, config)
|
|
||||||
var opts []broker.SubscribeOption
|
|
||||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
|
||||||
opts = append(opts, broker.SubscribeGroup(queue))
|
|
||||||
}
|
|
||||||
|
|
||||||
subCtx := config.Context
|
|
||||||
if cx := sb.Options().Context; cx != nil {
|
|
||||||
subCtx = cx
|
|
||||||
}
|
|
||||||
opts = append(opts, broker.SubscribeContext(subCtx))
|
|
||||||
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
|
|
||||||
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
|
|
||||||
|
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
|
||||||
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
|
|
||||||
}
|
|
||||||
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
g.subscribers[sb] = []broker.Subscriber{sub}
|
|
||||||
}
|
|
||||||
|
|
||||||
g.registered = true
|
g.registered = true
|
||||||
g.rsvc = service
|
g.rsvc = service
|
||||||
|
|
||||||
@@ -876,6 +843,10 @@ func (g *Server) Start() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = g.subscribe(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// micro: go ts.Accept(s.accept)
|
// micro: go ts.Accept(s.accept)
|
||||||
go func() {
|
go func() {
|
||||||
if err = g.srv.Serve(ts); err != nil {
|
if err = g.srv.Serve(ts); err != nil {
|
||||||
@@ -987,6 +958,37 @@ func (g *Server) Start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *Server) subscribe() error {
|
||||||
|
config := g.opts
|
||||||
|
|
||||||
|
for sb := range g.subscribers {
|
||||||
|
handler := g.createSubHandler(sb, config)
|
||||||
|
var opts []broker.SubscribeOption
|
||||||
|
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||||
|
opts = append(opts, broker.SubscribeGroup(queue))
|
||||||
|
}
|
||||||
|
|
||||||
|
subCtx := config.Context
|
||||||
|
if cx := sb.Options().Context; cx != nil {
|
||||||
|
subCtx = cx
|
||||||
|
}
|
||||||
|
opts = append(opts, broker.SubscribeContext(subCtx))
|
||||||
|
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
|
||||||
|
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
|
||||||
|
|
||||||
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
|
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
|
||||||
|
}
|
||||||
|
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
g.subscribers[sb] = []broker.Subscriber{sub}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (g *Server) Stop() error {
|
func (g *Server) Stop() error {
|
||||||
g.RLock()
|
g.RLock()
|
||||||
if !g.started {
|
if !g.started {
|
||||||
|
Reference in New Issue
Block a user