prepare v4 (need swap target!) (#195)
All checks were successful
test / test (push) Successful in 1m53s

move to v4 micro

Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org>
Reviewed-on: #195
Co-authored-by: Evstigneev Denis <danteevstigneev@yandex.ru>
Co-committed-by: Evstigneev Denis <danteevstigneev@yandex.ru>
This commit is contained in:
2025-03-02 21:13:25 +03:00
parent 134f7374aa
commit 23a1ff424e
14 changed files with 99 additions and 482 deletions

178
grpc.go
View File

@@ -10,7 +10,6 @@ import (
"slices"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
@@ -19,18 +18,16 @@ import (
reflectionv1pb "google.golang.org/grpc/reflection/grpc_reflection_v1"
// nolint: staticcheck
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/server"
msync "go.unistack.org/micro/v3/sync"
"go.unistack.org/micro/v3/tracer"
"go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v4/options"
"go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v4/semconv"
"go.unistack.org/micro/v4/server"
"go.unistack.org/micro/v4/tracer"
"golang.org/x/net/netutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -46,13 +43,6 @@ const (
DefaultContentType = "application/grpc"
)
/*
type ServerReflection struct {
srv *grpc.Server
s *serverReflectionServer
}
*/
type streamWrapper struct {
ctx context.Context
grpc.ServerStream
@@ -69,9 +59,7 @@ type Server struct {
handlers map[string]server.Handler
srv *grpc.Server
exit chan chan error
wg *msync.WaitGroup
rsvc *register.Service
subscribers map[*subscriber][]broker.Subscriber
rpc *rServer
opts server.Options
unknownHandler grpc.StreamHandler
@@ -92,7 +80,6 @@ func newServer(opts ...server.Option) *Server {
serviceMap: make(map[string]*service),
},
handlers: make(map[string]server.Handler),
subscribers: make(map[*subscriber][]broker.Subscriber),
exit: make(chan chan error),
stateLive: &atomic.Uint32{},
stateReady: &atomic.Uint32{},
@@ -104,22 +91,6 @@ func newServer(opts ...server.Option) *Server {
return g
}
/*
type grpcRouter struct {
h func(context.Context, server.Request, interface{}) error
m func(context.Context, server.Message) error
}
func (r grpcRouter) ProcessMessage(ctx context.Context, msg server.Message) error {
return r.m(ctx, msg)
}
func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
return r.h(ctx, req, rsp)
}
*/
func (g *Server) configure(opts ...server.Option) error {
g.Lock()
defer g.Unlock()
@@ -215,8 +186,9 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
return status.Errorf(codes.Internal, "method does not exist in context")
}
var gmd map[string][]string
// get grpc metadata
gmd, ok := gmetadata.FromIncomingContext(ctx)
gmd, ok = gmetadata.FromIncomingContext(ctx)
if !ok {
gmd = gmetadata.MD{}
}
@@ -249,25 +221,23 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
}()
}
md := metadata.New(len(gmd))
for k, v := range gmd {
md[k] = strings.Join(v, ", ")
}
md.Set("Path", fullMethod)
md.Set("Micro-Server", "grpc")
md := metadata.Copy(gmd)
md.Set("path", fullMethod)
md.Set("micro-server", "grpc")
md.Set(metadata.HeaderEndpoint, methodName)
md.Set(metadata.HeaderService, serviceName)
var td string
// timeout for server deadline
if v, ok := md.Get("timeout"); ok {
if v, ok := md.Get("timeout"); ok && len(v) > 0 {
md.Del("timeout")
td = v
td = v[0]
}
if v, ok := md.Get("Grpc-Timeout"); ok {
md.Del("Grpc-Timeout")
td = v[:len(v)-1]
switch v[len(v)-1:] {
if v, ok := md.Get("grpc-timeout"); ok && len(v) > 0 {
md.Del("grpc-timeout")
td = v[0][:len(v)-1]
switch v[0][len(v)-1:] {
case "S":
td += "s"
case "M":
@@ -286,11 +256,8 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
// get content type
ct := DefaultContentType
if ctype, ok := md.Get("content-type"); ok {
ct = ctype
} else if ctype, ok := md.Get("x-content-type"); ok {
ct = ctype
md.Del("x-content-type")
if ctype, ok := md.Get("content-type"); ok && len(ctype) > 0 {
ct = ctype[0]
}
// create new context
@@ -323,7 +290,7 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
// get peer from context
if p, ok := peer.FromContext(ctx); ok {
md.Set("Remote", p.Addr.String())
md.Set("remote", p.Addr.String())
ctx = peer.NewContext(ctx, p)
}
@@ -431,7 +398,7 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
// execute the handler
appErr := fn(ctx, r, replyv.Interface())
if outmd, ok := metadata.FromOutgoingContext(ctx); ok {
if err = stream.SendHeader(gmetadata.New(outmd)); err != nil {
if err = stream.SendHeader(gmetadata.MD(outmd.Copy())); err != nil {
return err
}
}
@@ -515,7 +482,7 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
appErr := fn(ctx, r, ss)
if outmd, ok := metadata.FromOutgoingContext(ctx); ok {
if err := stream.SendHeader(gmetadata.New(outmd)); err != nil {
if err := stream.SendHeader(gmetadata.MD(outmd.Copy())); err != nil {
return err
}
}
@@ -551,21 +518,6 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
return status.New(statusCode, statusDesc).Err()
}
func (g *Server) newCodec(ct string) (codec.Codec, error) {
g.RLock()
defer g.RUnlock()
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx]
}
if c, ok := g.opts.Codecs[ct]; ok {
return c, nil
}
return nil, codec.ErrUnknownContentType
}
func (g *Server) Options() server.Options {
g.RLock()
opts := g.opts
@@ -591,34 +543,6 @@ func (g *Server) Handle(h server.Handler) error {
return nil
}
func (g *Server) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
return newSubscriber(topic, sb, opts...)
}
func (g *Server) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*subscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
}
if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
if err := server.ValidateSubscriber(sb); err != nil {
return err
}
g.Lock()
if _, ok = g.subscribers[sub]; ok {
g.Unlock()
return fmt.Errorf("subscriber %v already exists", sub)
}
g.subscribers[sub] = nil
g.Unlock()
return nil
}
func (g *Server) Register() error {
g.RLock()
rsvc := g.rsvc
@@ -648,15 +572,6 @@ func (g *Server) Register() error {
sort.Strings(handlerList)
subscriberList := make([]*subscriber, 0, len(g.subscribers))
for e := range g.subscribers {
// Only advertise non internal subscribers
subscriberList = append(subscriberList, e)
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
g.RUnlock()
g.RLock()
@@ -718,26 +633,6 @@ func (g *Server) Deregister() error {
g.registered = false
wg := sync.WaitGroup{}
for sb, subs := range g.subscribers {
for _, sub := range subs {
wg.Add(1)
go func(s broker.Subscriber) {
defer wg.Done()
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, "Unsubscribing from topic: "+s.Topic())
}
if err := s.Unsubscribe(g.opts.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, "Unsubscribing from topic: "+s.Topic(), err)
}
}
}(sub)
}
g.subscribers[sb] = nil
}
wg.Wait()
g.Unlock()
return nil
}
@@ -785,21 +680,6 @@ func (g *Server) Start() error {
}
g.Unlock()
// only connect if we're subscribed
if len(g.subscribers) > 0 {
// connect to the broker
if err = config.Broker.Connect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err)
}
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()))
}
}
// use RegisterCheck func before register
// nolint: nestif
if err = g.opts.RegisterCheck(config.Context); err != nil {
@@ -815,10 +695,6 @@ func (g *Server) Start() error {
}
}
if err = g.subscribe(); err != nil {
return err
}
// micro: go ts.Accept(s.accept)
go func() {
if err = g.srv.Serve(ts); err != nil {