prepare v4
Some checks failed
lint / lint (pull_request) Failing after 1m38s
test / test (pull_request) Successful in 3m20s

This commit is contained in:
2025-02-26 20:32:20 +03:00
parent 134f7374aa
commit c0d186327e
14 changed files with 83 additions and 423 deletions

125
grpc.go
View File

@@ -19,18 +19,16 @@ import (
reflectionv1pb "google.golang.org/grpc/reflection/grpc_reflection_v1"
// nolint: staticcheck
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/server"
msync "go.unistack.org/micro/v3/sync"
"go.unistack.org/micro/v3/tracer"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v4/options"
"go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v4/semconv"
"go.unistack.org/micro/v4/server"
"go.unistack.org/micro/v4/tracer"
"golang.org/x/net/netutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -69,9 +67,7 @@ type Server struct {
handlers map[string]server.Handler
srv *grpc.Server
exit chan chan error
wg *msync.WaitGroup
rsvc *register.Service
subscribers map[*subscriber][]broker.Subscriber
rpc *rServer
opts server.Options
unknownHandler grpc.StreamHandler
@@ -92,7 +88,6 @@ func newServer(opts ...server.Option) *Server {
serviceMap: make(map[string]*service),
},
handlers: make(map[string]server.Handler),
subscribers: make(map[*subscriber][]broker.Subscriber),
exit: make(chan chan error),
stateLive: &atomic.Uint32{},
stateReady: &atomic.Uint32{},
@@ -251,7 +246,7 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
md := metadata.New(len(gmd))
for k, v := range gmd {
md[k] = strings.Join(v, ", ")
md[k] = v
}
md.Set("Path", fullMethod)
md.Set("Micro-Server", "grpc")
@@ -260,14 +255,14 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
var td string
// timeout for server deadline
if v, ok := md.Get("timeout"); ok {
if v, ok := md.Get("timeout"); ok && len(v) > 0 {
md.Del("timeout")
td = v
td = v[0]
}
if v, ok := md.Get("Grpc-Timeout"); ok {
if v, ok := md.Get("Grpc-Timeout"); ok && len(v) > 0 {
md.Del("Grpc-Timeout")
td = v[:len(v)-1]
switch v[len(v)-1:] {
td = v[0][:len(v)-1]
switch v[0][len(v)-1:] {
case "S":
td += "s"
case "M":
@@ -286,10 +281,10 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
// get content type
ct := DefaultContentType
if ctype, ok := md.Get("content-type"); ok {
ct = ctype
} else if ctype, ok := md.Get("x-content-type"); ok {
ct = ctype
if ctype, ok := md.Get("content-type"); ok && len(ctype) > 0 {
ct = ctype[0]
} else if ctype, ok := md.Get("x-content-type"); ok && len(ctype) > 0 {
ct = ctype[0]
md.Del("x-content-type")
}
@@ -431,7 +426,7 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
// execute the handler
appErr := fn(ctx, r, replyv.Interface())
if outmd, ok := metadata.FromOutgoingContext(ctx); ok {
if err = stream.SendHeader(gmetadata.New(outmd)); err != nil {
if err = stream.SendHeader(gmetadata.MD(outmd.Copy())); err != nil {
return err
}
}
@@ -515,7 +510,7 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
appErr := fn(ctx, r, ss)
if outmd, ok := metadata.FromOutgoingContext(ctx); ok {
if err := stream.SendHeader(gmetadata.New(outmd)); err != nil {
if err := stream.SendHeader(gmetadata.MD(outmd.Copy())); err != nil {
return err
}
}
@@ -591,34 +586,6 @@ func (g *Server) Handle(h server.Handler) error {
return nil
}
func (g *Server) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
return newSubscriber(topic, sb, opts...)
}
func (g *Server) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*subscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
}
if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
if err := server.ValidateSubscriber(sb); err != nil {
return err
}
g.Lock()
if _, ok = g.subscribers[sub]; ok {
g.Unlock()
return fmt.Errorf("subscriber %v already exists", sub)
}
g.subscribers[sub] = nil
g.Unlock()
return nil
}
func (g *Server) Register() error {
g.RLock()
rsvc := g.rsvc
@@ -648,15 +615,6 @@ func (g *Server) Register() error {
sort.Strings(handlerList)
subscriberList := make([]*subscriber, 0, len(g.subscribers))
for e := range g.subscribers {
// Only advertise non internal subscribers
subscriberList = append(subscriberList, e)
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
g.RUnlock()
g.RLock()
@@ -718,26 +676,6 @@ func (g *Server) Deregister() error {
g.registered = false
wg := sync.WaitGroup{}
for sb, subs := range g.subscribers {
for _, sub := range subs {
wg.Add(1)
go func(s broker.Subscriber) {
defer wg.Done()
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, "Unsubscribing from topic: "+s.Topic())
}
if err := s.Unsubscribe(g.opts.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, "Unsubscribing from topic: "+s.Topic(), err)
}
}
}(sub)
}
g.subscribers[sb] = nil
}
wg.Wait()
g.Unlock()
return nil
}
@@ -785,21 +723,6 @@ func (g *Server) Start() error {
}
g.Unlock()
// only connect if we're subscribed
if len(g.subscribers) > 0 {
// connect to the broker
if err = config.Broker.Connect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err)
}
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()))
}
}
// use RegisterCheck func before register
// nolint: nestif
if err = g.opts.RegisterCheck(config.Context); err != nil {
@@ -815,10 +738,6 @@ func (g *Server) Start() error {
}
}
if err = g.subscribe(); err != nil {
return err
}
// micro: go ts.Accept(s.accept)
go func() {
if err = g.srv.Serve(ts); err != nil {