diff --git a/codec.go b/codec.go index 4851885..860e628 100644 --- a/codec.go +++ b/codec.go @@ -1,7 +1,7 @@ package grpc import ( - "go.unistack.org/micro/v3/codec" + "go.unistack.org/micro/v4/codec" "google.golang.org/grpc/encoding" ) diff --git a/error.go b/error.go index 170f169..c1a6cec 100644 --- a/error.go +++ b/error.go @@ -6,7 +6,7 @@ import ( "net/http" "os" - "go.unistack.org/micro/v3/errors" + "go.unistack.org/micro/v4/errors" "google.golang.org/grpc/codes" ) diff --git a/generate.go b/generate.go index 5ea1f90..c351a9c 100644 --- a/generate.go +++ b/generate.go @@ -1,4 +1,4 @@ package grpc //go:generate go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest -//go:generate sh -c "protoc -I./proto -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v3) -I. --go-grpc_out=paths=source_relative:./proto --go_out=paths=source_relative:./proto proto/test.proto" +//go:generate sh -c "protoc -I./proto -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v4) -I. --go-grpc_out=paths=source_relative:./proto --go_out=paths=source_relative:./proto proto/test.proto" diff --git a/go.mod b/go.mod index defa97b..0c9683a 100644 --- a/go.mod +++ b/go.mod @@ -1,23 +1,22 @@ -module go.unistack.org/micro-server-grpc/v3 +module go.unistack.org/micro-server-grpc/v4 go 1.22.0 - require ( - go.unistack.org/micro/v3 v3.11.30 - golang.org/x/net v0.33.0 - google.golang.org/grpc v1.69.2 - google.golang.org/protobuf v1.36.1 + go.unistack.org/micro/v4 v4.1.2 + golang.org/x/net v0.35.0 + google.golang.org/grpc v1.70.0 + google.golang.org/protobuf v1.36.5 ) require ( github.com/ash3in/uuidv8 v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/kr/pretty v0.3.1 // indirect github.com/matoous/go-nanoid v1.5.1 // indirect - go.unistack.org/micro-proto/v3 v3.4.1 // indirect - golang.org/x/sys v0.28.0 // indirect - golang.org/x/text v0.21.0 // indirect + github.com/spf13/cast v1.7.1 // indirect + go.unistack.org/micro-proto/v4 v4.1.0 // indirect + golang.org/x/sys v0.30.0 // indirect + golang.org/x/text v0.22.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 47e3cc3..141b7be 100644 --- a/go.sum +++ b/go.sum @@ -2,7 +2,8 @@ github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7Oputl github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI= github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -19,36 +20,36 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4= github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= -go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= -go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= -go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= -go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= -go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= -go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= -go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= -go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= -go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= -go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= -go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= -go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= -go.unistack.org/micro/v3 v3.11.30 h1:XTLgZubSGzQL85IUMp1pTJnS1lP4eFwTZyelV/SzOMc= -go.unistack.org/micro/v3 v3.11.30/go.mod h1:fvOkXKs3wKHToWH6Mxy+aovEiDl2q4UlOCdVfJdziBU= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y= +github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= +go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= +go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= +go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk= +go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec= +go.unistack.org/micro/v4 v4.1.2 h1:9SOlPYyPNNFpg1A7BsvhDyQm3gysLH1AhWbDCp1hyoY= +go.unistack.org/micro/v4 v4.1.2/go.mod h1:lr3oYED8Ay1vjK68QqRw30QOtdk/ffpZqMFDasOUhKw= +golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 h1:TqExAhdPaB60Ux47Cn0oLV07rGnxZzIsaRhQaqS666A= google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= -google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= -google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= -google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= -google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= +google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/grpc.go b/grpc.go index 3af6dee..4549c7a 100644 --- a/grpc.go +++ b/grpc.go @@ -10,7 +10,6 @@ import ( "slices" "sort" "strconv" - "strings" "sync" "sync/atomic" "time" @@ -19,18 +18,16 @@ import ( reflectionv1pb "google.golang.org/grpc/reflection/grpc_reflection_v1" // nolint: staticcheck - "go.unistack.org/micro/v3/broker" - "go.unistack.org/micro/v3/codec" - "go.unistack.org/micro/v3/errors" - "go.unistack.org/micro/v3/logger" - "go.unistack.org/micro/v3/metadata" - "go.unistack.org/micro/v3/meter" - "go.unistack.org/micro/v3/options" - "go.unistack.org/micro/v3/register" - "go.unistack.org/micro/v3/semconv" - "go.unistack.org/micro/v3/server" - msync "go.unistack.org/micro/v3/sync" - "go.unistack.org/micro/v3/tracer" + + "go.unistack.org/micro/v4/errors" + "go.unistack.org/micro/v4/logger" + "go.unistack.org/micro/v4/metadata" + "go.unistack.org/micro/v4/meter" + "go.unistack.org/micro/v4/options" + "go.unistack.org/micro/v4/register" + "go.unistack.org/micro/v4/semconv" + "go.unistack.org/micro/v4/server" + "go.unistack.org/micro/v4/tracer" "golang.org/x/net/netutil" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -46,13 +43,6 @@ const ( DefaultContentType = "application/grpc" ) -/* -type ServerReflection struct { - srv *grpc.Server - s *serverReflectionServer -} -*/ - type streamWrapper struct { ctx context.Context grpc.ServerStream @@ -69,9 +59,7 @@ type Server struct { handlers map[string]server.Handler srv *grpc.Server exit chan chan error - wg *msync.WaitGroup rsvc *register.Service - subscribers map[*subscriber][]broker.Subscriber rpc *rServer opts server.Options unknownHandler grpc.StreamHandler @@ -92,7 +80,6 @@ func newServer(opts ...server.Option) *Server { serviceMap: make(map[string]*service), }, handlers: make(map[string]server.Handler), - subscribers: make(map[*subscriber][]broker.Subscriber), exit: make(chan chan error), stateLive: &atomic.Uint32{}, stateReady: &atomic.Uint32{}, @@ -104,22 +91,6 @@ func newServer(opts ...server.Option) *Server { return g } -/* -type grpcRouter struct { - h func(context.Context, server.Request, interface{}) error - m func(context.Context, server.Message) error -} - -func (r grpcRouter) ProcessMessage(ctx context.Context, msg server.Message) error { - return r.m(ctx, msg) -} - -func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { - return r.h(ctx, req, rsp) -} - -*/ - func (g *Server) configure(opts ...server.Option) error { g.Lock() defer g.Unlock() @@ -215,8 +186,9 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error { return status.Errorf(codes.Internal, "method does not exist in context") } + var gmd map[string][]string // get grpc metadata - gmd, ok := gmetadata.FromIncomingContext(ctx) + gmd, ok = gmetadata.FromIncomingContext(ctx) if !ok { gmd = gmetadata.MD{} } @@ -249,25 +221,23 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error { }() } - md := metadata.New(len(gmd)) - for k, v := range gmd { - md[k] = strings.Join(v, ", ") - } - md.Set("Path", fullMethod) - md.Set("Micro-Server", "grpc") + md := metadata.Copy(gmd) + + md.Set("path", fullMethod) + md.Set("micro-server", "grpc") md.Set(metadata.HeaderEndpoint, methodName) md.Set(metadata.HeaderService, serviceName) var td string // timeout for server deadline - if v, ok := md.Get("timeout"); ok { + if v, ok := md.Get("timeout"); ok && len(v) > 0 { md.Del("timeout") - td = v + td = v[0] } - if v, ok := md.Get("Grpc-Timeout"); ok { - md.Del("Grpc-Timeout") - td = v[:len(v)-1] - switch v[len(v)-1:] { + if v, ok := md.Get("grpc-timeout"); ok && len(v) > 0 { + md.Del("grpc-timeout") + td = v[0][:len(v)-1] + switch v[0][len(v)-1:] { case "S": td += "s" case "M": @@ -286,11 +256,8 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error { // get content type ct := DefaultContentType - if ctype, ok := md.Get("content-type"); ok { - ct = ctype - } else if ctype, ok := md.Get("x-content-type"); ok { - ct = ctype - md.Del("x-content-type") + if ctype, ok := md.Get("content-type"); ok && len(ctype) > 0 { + ct = ctype[0] } // create new context @@ -323,7 +290,7 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error { // get peer from context if p, ok := peer.FromContext(ctx); ok { - md.Set("Remote", p.Addr.String()) + md.Set("remote", p.Addr.String()) ctx = peer.NewContext(ctx, p) } @@ -431,7 +398,7 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s // execute the handler appErr := fn(ctx, r, replyv.Interface()) if outmd, ok := metadata.FromOutgoingContext(ctx); ok { - if err = stream.SendHeader(gmetadata.New(outmd)); err != nil { + if err = stream.SendHeader(gmetadata.MD(outmd.Copy())); err != nil { return err } } @@ -515,7 +482,7 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se appErr := fn(ctx, r, ss) if outmd, ok := metadata.FromOutgoingContext(ctx); ok { - if err := stream.SendHeader(gmetadata.New(outmd)); err != nil { + if err := stream.SendHeader(gmetadata.MD(outmd.Copy())); err != nil { return err } } @@ -551,21 +518,6 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se return status.New(statusCode, statusDesc).Err() } -func (g *Server) newCodec(ct string) (codec.Codec, error) { - g.RLock() - defer g.RUnlock() - - if idx := strings.IndexRune(ct, ';'); idx >= 0 { - ct = ct[:idx] - } - - if c, ok := g.opts.Codecs[ct]; ok { - return c, nil - } - - return nil, codec.ErrUnknownContentType -} - func (g *Server) Options() server.Options { g.RLock() opts := g.opts @@ -591,34 +543,6 @@ func (g *Server) Handle(h server.Handler) error { return nil } -func (g *Server) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber { - return newSubscriber(topic, sb, opts...) -} - -func (g *Server) Subscribe(sb server.Subscriber) error { - sub, ok := sb.(*subscriber) - if !ok { - return fmt.Errorf("invalid subscriber: expected *subscriber") - } - if len(sub.handlers) == 0 { - return fmt.Errorf("invalid subscriber: no handler functions") - } - - if err := server.ValidateSubscriber(sb); err != nil { - return err - } - - g.Lock() - if _, ok = g.subscribers[sub]; ok { - g.Unlock() - return fmt.Errorf("subscriber %v already exists", sub) - } - - g.subscribers[sub] = nil - g.Unlock() - return nil -} - func (g *Server) Register() error { g.RLock() rsvc := g.rsvc @@ -648,15 +572,6 @@ func (g *Server) Register() error { sort.Strings(handlerList) - subscriberList := make([]*subscriber, 0, len(g.subscribers)) - for e := range g.subscribers { - // Only advertise non internal subscribers - subscriberList = append(subscriberList, e) - } - sort.Slice(subscriberList, func(i, j int) bool { - return subscriberList[i].topic > subscriberList[j].topic - }) - g.RUnlock() g.RLock() @@ -718,26 +633,6 @@ func (g *Server) Deregister() error { g.registered = false - wg := sync.WaitGroup{} - for sb, subs := range g.subscribers { - for _, sub := range subs { - wg.Add(1) - go func(s broker.Subscriber) { - defer wg.Done() - if config.Logger.V(logger.InfoLevel) { - config.Logger.Info(config.Context, "Unsubscribing from topic: "+s.Topic()) - } - if err := s.Unsubscribe(g.opts.Context); err != nil { - if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error(config.Context, "Unsubscribing from topic: "+s.Topic(), err) - } - } - }(sub) - } - g.subscribers[sb] = nil - } - wg.Wait() - g.Unlock() return nil } @@ -785,21 +680,6 @@ func (g *Server) Start() error { } g.Unlock() - // only connect if we're subscribed - if len(g.subscribers) > 0 { - // connect to the broker - if err = config.Broker.Connect(config.Context); err != nil { - if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error(config.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err) - } - return err - } - - if config.Logger.V(logger.InfoLevel) { - config.Logger.Info(config.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())) - } - } - // use RegisterCheck func before register // nolint: nestif if err = g.opts.RegisterCheck(config.Context); err != nil { @@ -815,10 +695,6 @@ func (g *Server) Start() error { } } - if err = g.subscribe(); err != nil { - return err - } - // micro: go ts.Accept(s.accept) go func() { if err = g.srv.Serve(ts); err != nil { diff --git a/handler.go b/handler.go index af7af8f..2b5017a 100644 --- a/handler.go +++ b/handler.go @@ -3,7 +3,7 @@ package grpc import ( "reflect" - "go.unistack.org/micro/v3/server" + "go.unistack.org/micro/v4/server" ) type rpcHandler struct { diff --git a/options.go b/options.go index be712aa..f730f0c 100644 --- a/options.go +++ b/options.go @@ -3,7 +3,7 @@ package grpc import ( "context" - "go.unistack.org/micro/v3/server" + "go.unistack.org/micro/v4/server" "google.golang.org/grpc" "google.golang.org/grpc/encoding" ) diff --git a/reflection_test.go b/reflection_test.go index 4a2780f..48320b4 100644 --- a/reflection_test.go +++ b/reflection_test.go @@ -3,9 +3,8 @@ package grpc import ( "fmt" "testing" - - _ "go.unistack.org/micro-server-grpc/v3/proto" - "go.unistack.org/micro/v3/server" + + "go.unistack.org/micro/v4/server" "google.golang.org/grpc" "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoregistry" diff --git a/request.go b/request.go index 86260be..ed9f520 100644 --- a/request.go +++ b/request.go @@ -1,9 +1,9 @@ package grpc import ( - "go.unistack.org/micro/v3/codec" - "go.unistack.org/micro/v3/metadata" - "go.unistack.org/micro/v3/server" + "go.unistack.org/micro/v4/codec" + "go.unistack.org/micro/v4/metadata" + "go.unistack.org/micro/v4/server" ) var _ server.Request = &rpcRequest{} diff --git a/response.go b/response.go index b1ad9b7..a317d28 100644 --- a/response.go +++ b/response.go @@ -1,9 +1,9 @@ package grpc import ( - "go.unistack.org/micro/v3/codec" - "go.unistack.org/micro/v3/metadata" - "go.unistack.org/micro/v3/server" + "go.unistack.org/micro/v4/codec" + "go.unistack.org/micro/v4/metadata" + "go.unistack.org/micro/v4/server" ) var _ server.Response = &rpcResponse{} diff --git a/server.go b/server.go index c1e064c..73f0851 100644 --- a/server.go +++ b/server.go @@ -14,7 +14,7 @@ import ( "unicode" "unicode/utf8" - "go.unistack.org/micro/v3/server" + "go.unistack.org/micro/v4/server" ) // Precompute the reflect type for error. Can't use error directly @@ -47,8 +47,8 @@ type rServer struct { // Is this an exported - upper case - name? func isExported(name string) bool { - rune, _ := utf8.DecodeRuneInString(name) - return unicode.IsUpper(rune) + r, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(r) } // Is this type exported or a builtin? @@ -123,43 +123,43 @@ func prepareEndpoint(method reflect.Method) (*methodType, error) { return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}, nil } -func (server *rServer) register(rcvr interface{}) error { - server.mu.Lock() - defer server.mu.Unlock() - if server.serviceMap == nil { - server.serviceMap = make(map[string]*service) +func (s *rServer) register(rcvr interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.serviceMap == nil { + s.serviceMap = make(map[string]*service) } - s := &service{} - s.typ = reflect.TypeOf(rcvr) - s.rcvr = reflect.ValueOf(rcvr) - sname := reflect.Indirect(s.rcvr).Type().Name() + srv := &service{} + srv.typ = reflect.TypeOf(rcvr) + srv.rcvr = reflect.ValueOf(rcvr) + sname := reflect.Indirect(srv.rcvr).Type().Name() if sname == "" { - return fmt.Errorf("rpc: no service name for type %v", s.typ.String()) + return fmt.Errorf("rpc: no service name for type %v", srv.typ.String()) } if !isExported(sname) { return fmt.Errorf("rpc Register: type %s is not exported", sname) } - if _, present := server.serviceMap[sname]; present { + if _, present := s.serviceMap[sname]; present { return fmt.Errorf("rpc: service already defined: %s", sname) } - s.name = sname - s.method = make(map[string]*methodType) + srv.name = sname + srv.method = make(map[string]*methodType) // Install the methods - for m := 0; m < s.typ.NumMethod(); m++ { - method := s.typ.Method(m) + for m := 0; m < srv.typ.NumMethod(); m++ { + method := srv.typ.Method(m) mt, err := prepareEndpoint(method) if mt != nil && err == nil { - s.method[method.Name] = mt + srv.method[method.Name] = mt } else if err != nil { return err } } - if len(s.method) == 0 { + if len(srv.method) == 0 { return fmt.Errorf("rpc Register: type %s has no exported methods of suitable type", sname) } - server.serviceMap[s.name] = s + s.serviceMap[srv.name] = srv return nil } diff --git a/stream.go b/stream.go index 96447e5..cad4d55 100644 --- a/stream.go +++ b/stream.go @@ -3,7 +3,7 @@ package grpc import ( "context" - "go.unistack.org/micro/v3/server" + "go.unistack.org/micro/v4/server" "google.golang.org/grpc" ) diff --git a/subscriber.go b/subscriber.go deleted file mode 100644 index e3ef6b3..0000000 --- a/subscriber.go +++ /dev/null @@ -1,258 +0,0 @@ -package grpc - -import ( - "context" - "fmt" - "reflect" - "strings" - - "go.unistack.org/micro/v3/broker" - "go.unistack.org/micro/v3/codec" - "go.unistack.org/micro/v3/logger" - "go.unistack.org/micro/v3/metadata" - "go.unistack.org/micro/v3/options" - "go.unistack.org/micro/v3/server" -) - -var _ server.Message = &rpcMessage{} - -type rpcMessage struct { - payload interface{} - codec codec.Codec - header metadata.Metadata - topic string - contentType string -} - -func (r *rpcMessage) ContentType() string { - return r.contentType -} - -func (r *rpcMessage) Topic() string { - return r.topic -} - -func (r *rpcMessage) Body() interface{} { - return r.payload -} - -func (r *rpcMessage) Header() metadata.Metadata { - return r.header -} - -func (r *rpcMessage) Codec() codec.Codec { - return r.codec -} - -type handler struct { - reqType reflect.Type - ctxType reflect.Type - method reflect.Value -} - -type subscriber struct { - topic string - rcvr reflect.Value - typ reflect.Type - subscriber interface{} - handlers []*handler - opts server.SubscriberOptions -} - -func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber { - options := server.NewSubscriberOptions(opts...) - - var handlers []*handler - - if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func { - h := &handler{ - method: reflect.ValueOf(sub), - } - - switch typ.NumIn() { - case 1: - h.reqType = typ.In(0) - case 2: - h.ctxType = typ.In(0) - h.reqType = typ.In(1) - } - - handlers = append(handlers, h) - - } else { - for m := 0; m < typ.NumMethod(); m++ { - method := typ.Method(m) - h := &handler{ - method: method.Func, - } - - switch method.Type.NumIn() { - case 2: - h.reqType = method.Type.In(1) - case 3: - h.ctxType = method.Type.In(1) - h.reqType = method.Type.In(2) - } - - handlers = append(handlers, h) - - } - } - - return &subscriber{ - rcvr: reflect.ValueOf(sub), - typ: reflect.TypeOf(sub), - topic: topic, - subscriber: sub, - handlers: handlers, - opts: options, - } -} - -func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { - return func(p broker.Event) (err error) { - msg := p.Message() - // if we don't have headers, create empty map - if msg.Header == nil { - msg.Header = make(map[string]string) - } - - ct := msg.Header["Content-Type"] - if len(ct) == 0 { - msg.Header["Content-Type"] = DefaultContentType - ct = DefaultContentType - } - cf, err := g.newCodec(ct) - if err != nil { - return err - } - - hdr := make(map[string]string, len(msg.Header)) - for k, v := range msg.Header { - hdr[k] = v - } - - ctx := metadata.NewIncomingContext(sb.opts.Context, hdr) - - results := make(chan error, len(sb.handlers)) - - for i := 0; i < len(sb.handlers); i++ { - handler := sb.handlers[i] - - var isVal bool - var req reflect.Value - - if handler.reqType.Kind() == reflect.Ptr { - req = reflect.New(handler.reqType.Elem()) - } else { - req = reflect.New(handler.reqType) - isVal = true - } - if isVal { - req = req.Elem() - } - - if err = cf.Unmarshal(msg.Body, req.Interface()); err != nil { - return err - } - - fn := func(ctx context.Context, msg server.Message) error { - var vals []reflect.Value - if sb.typ.Kind() != reflect.Func { - vals = append(vals, sb.rcvr) - } - if handler.ctxType != nil { - vals = append(vals, reflect.ValueOf(ctx)) - } - - vals = append(vals, reflect.ValueOf(msg.Body())) - - returnValues := handler.method.Call(vals) - if rerr := returnValues[0].Interface(); rerr != nil { - return rerr.(error) - } - return nil - } - - opts.Hooks.EachPrev(func(hook options.Hook) { - if h, ok := hook.(server.HookSubHandler); ok { - fn = h(fn) - } - }) - - if g.wg != nil { - g.wg.Add(1) - } - go func() { - if g.wg != nil { - defer g.wg.Done() - } - cerr := fn(ctx, &rpcMessage{ - topic: sb.topic, - contentType: ct, - payload: req.Interface(), - header: msg.Header, - }) - results <- cerr - }() - } - var errors []string - for i := 0; i < len(sb.handlers); i++ { - if rerr := <-results; rerr != nil { - errors = append(errors, rerr.Error()) - } - } - if len(errors) > 0 { - err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) - } - - return err - } -} - -func (s *subscriber) Topic() string { - return s.topic -} - -func (s *subscriber) Subscriber() interface{} { - return s.subscriber -} - -func (s *subscriber) Options() server.SubscriberOptions { - return s.opts -} - -func (g *Server) subscribe() error { - config := g.opts - subCtx := config.Context - - for sb := range g.subscribers { - - if cx := sb.Options().Context; cx != nil { - subCtx = cx - } - - opts := []broker.SubscribeOption{ - broker.SubscribeContext(subCtx), - broker.SubscribeAutoAck(sb.Options().AutoAck), - broker.SubscribeBodyOnly(sb.Options().BodyOnly), - } - - if queue := sb.Options().Queue; len(queue) > 0 { - opts = append(opts, broker.SubscribeGroup(queue)) - } - - if config.Logger.V(logger.InfoLevel) { - config.Logger.Info(config.Context, "subscribing to topic: "+sb.Topic()) - } - - sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), g.createSubHandler(sb, config), opts...) - if err != nil { - return err - } - - g.subscribers[sb] = []broker.Subscriber{sub} - } - - return nil -}